提问人:roboes 提问时间:11/12/2023 最后编辑:roboes 更新时间:11/14/2023 访问量:54
交互式服务器驱动的 Flask SQL 表,可选择在 .csv 中下载筛选的数据
Interactive server-driven Flask SQL table with option to download filtered data in .csv
问:
我正在尝试使用带有服务器端处理选项的 、(不是)和(jQuery 插件).csv下载 SQLite 查询表。Flask
SQLAlchemy
Flask-SQLAlchemy
DataTables
基本上,我有一个连接到 SQLite 数据集的查询,导入数据并将其返回给 DataTable。
已经起作用的总结:
- 下拉列表(HTML 选择元素)以选择应显示的表格。
- 使用 DataTable 的“服务器端处理”选项检索给定所选表的数据,该选项允许通过 Ajax 请求(使用 和 )跨多个部分请求获取大数据。
offset
limit
- 筛选每列中不区分大小写的文本,该文本从 DataTable 表传递到 Flask 代码(服务器端)并相应地检索。用户需要在输入代码后按“Enter”才能应用文本过滤器。
- 下载完整的显示表格作为.csv。
我面临的问题是,一旦我从任何列中输入过滤器,下载的.csv仍然包含完整的表格,而不是过滤后的表格。例如,当我在中搜索“2”并按 Enter 键时,我看到的筛选数据符合预期:Column A
但是当我按下下载表时,下载的是完整的表数据,而不是过滤后的数据。我只想下载过滤后的数据。有谁知道如何解决这个问题?提前致谢。
下面我提供了一个完整的工作代码,其中包含一个示例 SQLite“test.db”数据集,其中包含 2 个表(“table_a”和“table_b”)来重现代码。我正在使用最新版本的软件包(和)。Flask==3.0.0
pandas==2.1.3
SQLAlchemy==2.0.22
/templates/index.html
<!DOCTYPE html>
<html>
<head>
<title>Flask Application</title>
<link rel="stylesheet" href="https://cdn.datatables.net/1.13.7/css/jquery.dataTables.css"/>
<link rel="stylesheet" href="https://cdn.datatables.net/1.13.7/css/dataTables.bootstrap5.min.css"/>
<style>
.text-wrap{
white-space:normal;
}
.width-200{
width:200px;
}
</style>
</head>
<body>
<h1>Table Viewer</h1>
<form id="selector" action="{{ url_for('.database_download') }}" method="POST" enctype="multipart/form-data">
Select table:
<select id="table-selector" name="table-selector">
<!-- <option disabled selected value>Select a Dataset</option> -->
<option value="table_a" selected>Table A</option>
<option value="table_b">Table B</option>
</select>
</form>
<br>
<p>
<input type="checkbox" id="order-columns" value="True" checked> Order columns by date</p>
<p><input type="button" id="button-database-selector" value="Select Table" /></p>
<br>
<br>
<br>
<div id="content-a" class="table table-striped table-hover" style="display:none; width:100%; table-layout:fixed; overflow-x:auto;">
<table id="table-a">
<thead>
<tr>
<th></th>
<th></th>
</tr>
<tr>
<th></th>
<th></th>
</tr>
</thead>
</table>
</div>
<div id="content-b" class="table table-striped table-hover" style="display:none; width:100%; table-layout:fixed; overflow-x:auto;">
<table id="table-b">
<thead>
<tr>
<th></th>
<th></th>
</tr>
<tr>
<th></th>
<th></th>
</tr>
</thead>
</table>
</div>
<div id="download" style="display:none">
<p>Download table: <input type="submit" value="Download" form="selector"></button></p>
</div>
<script type="text/javascript" src="https://code.jquery.com/jquery-3.7.0.js"></script>
<script type="text/javascript" src="https://cdn.datatables.net/1.13.7/js/jquery.dataTables.min.js"></script>
<script type="text/javascript" src="https://cdn.datatables.net/1.13.7/js/dataTables.bootstrap5.min.js"></script>
<script type="text/javascript">
$(document).ready( function() {
$('input[id="button-database-selector"]').click( function() {
if ($('input[id="order-columns"]').prop('checked')) {
var order_columns = 'True';
} else {
var order_columns = '';
}
var database_selected = $('select[id="table-selector"]').val();
// Display download button
$('div[id="download"]').css('display', 'block');
if (database_selected == 'table_a') {
// Hide all tables
$('div[id^="content-"]').hide();
// Display table
$('div[id="content-a"]').css('display', 'block');
// Setup - add a text input to each header cell
$('#table-a thead tr:eq(1) th').each( function () {
var title = $('#table-a thead tr:eq(0) th').eq( $(this).index() ).text();
$(this).html( '<input type="text" placeholder="Search '+title+'" />' );
} );
var table = $('table[id="table-a"]').DataTable({
'pageLength': 50,
'ordering': false,
// 'scrollX': true,
// 'sScrollX': '100%',
'searching': true,
'dom': 'lrtip',
'orderCellsTop': true,
'serverSide': true,
'bDestroy': true,
'ajax': {
'url': "{{ url_for('.table_a') }}",
'type': 'GET',
'data': {
'orderColumns': order_columns,
},
},
'columns': [
{ 'title': 'Column Date', 'data': 'column_date' },
{ 'title': 'Column A', 'data': 'column_a' },
],
'columnDefs': [
{
'render': function (data, type, full, meta) {
return "<div class='text-wrap width-200'>" + data + "</div>";
},
'targets': '_all',
}
],
});
// Apply the search
table.columns().every( function (index) {
$('#table-a thead tr:eq(1) th:eq(' + index + ') input').on('keydown', function (event) {
if (event.keyCode == 13) {
table.column($(this).parent().index() + ':visible')
.search(this.value)
.draw();
}
});
});
} else if (database_selected === 'table_b') {
// Hide all tables
$('div[id^="content-"]').hide();
// Display table
$('div[id="content-b"]').css('display', 'block');
// Setup - add a text input to each header cell
$('#table-b thead tr:eq(1) th').each( function () {
var title = $('#table-b thead tr:eq(0) th').eq( $(this).index() ).text();
$(this).html( '<input type="text" placeholder="Search '+title+'" />' );
} );
var table = $('table[id="table-b"]').DataTable({
'pageLength': 50,
'ordering': false,
// 'scrollX': true,
// 'sScrollX': '100%',
'searching': true,
'dom': 'lrtip',
'orderCellsTop': true,
'serverSide': true,
'bDestroy': true,
'ajax': {
'url': "{{ url_for('.table_b') }}",
'type': 'GET',
'data': {
'orderColumns': order_columns,
},
},
'columns': [
{ 'title': 'Column Date', 'data': 'column_date' },
{ 'title': 'Column B', 'data': 'column_b' },
],
'columnDefs': [
{
'render': function (data, type, full, meta) {
return "<div class='text-wrap width-200'>" + data + "</div>";
},
'targets': '_all',
}
],
});
// Apply the search
table.columns().every( function (index) {
$('#table-b thead tr:eq(1) th:eq(' + index + ') input').on('keydown', function (event) {
if (event.keyCode == 13) {
table.column($(this).parent().index() + ':visible')
.search(this.value)
.draw();
}
});
});
}
});
});
</script>
</body>
</html>
/app.py
# Import packages
from datetime import date
from io import BytesIO
from fastapi.encoders import jsonable_encoder
from flask import Flask, render_template, request, jsonify, send_file
import pandas as pd
from sqlalchemy import create_engine, Column, DateTime, Integer, String
from sqlalchemy.orm import declarative_base, sessionmaker
app = Flask(__name__)
# Create a SQLite database engine
engine = create_engine(
url='sqlite:///test.db',
echo=False,
)
# Create a session
Session = sessionmaker(bind=engine)
session = Session()
# Create a base class for declarative models
Base = declarative_base()
# Define "table_a" model
class table_a(Base):
__tablename__ = 'table_a'
id = Column(Integer, primary_key=True)
column_date = Column(DateTime)
column_a = Column(String)
# Define "table_b" model
class table_b(Base):
__tablename__ = 'table_b'
id = Column(Integer, primary_key=True)
column_date = Column(DateTime)
column_b = Column(String)
with app.app_context():
Base.metadata.drop_all(bind=engine)
Base.metadata.create_all(bind=engine)
# Add sample data to the "table_a" table
session.add_all(
[
table_a(column_date=date(2023, 11, 10), column_a='1'),
table_a(column_date=date(2023, 10, 31), column_a='1'),
table_a(column_date=date(2023, 10, 31), column_a='2'),
],
)
# Add sample data to the "table_b" table
session.add_all(
[
table_b(column_date=date(2023, 11, 1), column_b='1'),
table_b(column_date=date(2023, 11, 11), column_b='1'),
],
)
session.commit()
def read_table(*, table_name):
# Read a SQLite database engine
engine = create_engine(
url='sqlite:///test.db',
echo=False,
)
# Create a session
Session = sessionmaker(bind=engine)
session = Session()
# Create a base class for declarative models
Base = declarative_base()
# Define "table_a" model
class table_a(Base):
__tablename__ = 'table_a'
id = Column(Integer, primary_key=True)
column_date = Column(DateTime)
column_a = Column(String)
# Define "table_b" model
class table_b(Base):
__tablename__ = 'table_b'
id = Column(Integer, primary_key=True)
column_date = Column(DateTime)
column_b = Column(String)
if table_name == 'table_a':
sql_query = session.query(table_a)
# Filters
filter_column_date = request.args.get(
key='columns[0][search][value]',
default='',
type=str,
)
filter_column_a = request.args.get(
key='columns[1][search][value]',
default='',
type=str,
)
# Other parameters
order_columns = request.args.get(key='orderColumns', default='', type=bool)
if filter_column_date != '':
sql_query = sql_query.filter(
table_a.column_date.ilike(
f'%{filter_column_date}%',
),
)
if filter_column_a != '':
sql_query = sql_query.filter(
table_a.column_a.ilike(
f'%{filter_column_a}%',
),
)
if order_columns is True:
sql_query = sql_query.order_by(table_a.column_date.asc())
if table_name == 'table_b':
sql_query = session.query(table_b)
# Filters
filter_column_date = request.args.get(
key='columns[0][search][value]',
default='',
type=str,
)
filter_column_b = request.args.get(
key='columns[1][search][value]',
default='',
type=str,
)
# Other parameters
order_columns = request.args.get(key='orderColumns', default='', type=bool)
if filter_column_date != '':
sql_query = sql_query.filter(
table_b.column_date.ilike(
f'%{filter_column_date}%',
),
)
if filter_column_b != '':
sql_query = sql_query.filter(
table_b.column_b.ilike(
f'%{filter_column_b}%',
),
)
if order_columns is True:
sql_query = sql_query.order_by(table_b.column_date.asc())
return sql_query
@app.route('/')
def home():
return render_template(template_name_or_list='index.html')
@app.route(rule='/table-a', methods=['GET'])
def table_a():
# Get the pagination parameters from the request
start = request.args.get(key='start', default=0, type=int)
length = request.args.get(key='length', default=10, type=int)
sql_query = read_table(table_name='table_a')
response = {
'draw': request.args.get(key='draw', default=1, type=int),
'recordsTotal': sql_query.count(),
'recordsFiltered': sql_query.count(),
'data': jsonable_encoder(sql_query.offset(start).limit(length).all()),
}
return jsonify(response)
@app.route(rule='/table-b', methods=['GET'])
def table_b():
# Get the pagination parameters from the request
start = request.args.get(key='start', default=0, type=int)
length = request.args.get(key='length', default=10, type=int)
sql_query = read_table(table_name='table_b')
response = {
'draw': request.args.get(key='draw', default=1, type=int),
'recordsTotal': sql_query.count(),
'recordsFiltered': sql_query.count(),
'data': jsonable_encoder(sql_query.offset(start).limit(length).all()),
}
return jsonify(response)
@app.route(rule='/database-download', methods=['GET', 'POST'])
def database_download():
if request.method == 'POST':
table = request.form.get('table-selector')
sql_query = read_table(table_name=f'{table}')
# Create a binary data memory file
buffer = BytesIO()
df = pd.read_sql(
sql=sql_query.statement,
con=create_engine(
url='sqlite:///test.db',
echo=False,
),
index_col=None,
coerce_float=True,
params=None,
parse_dates=None,
chunksize=None,
)
# DataFrame to buffer
df.to_csv(
path_or_buf=buffer,
sep=',',
na_rep='',
header=True,
index=False,
index_label=None,
encoding='utf-8-sig',
)
# Change the stream position to the start of the stream
buffer.seek(0)
return send_file(
path_or_file=buffer,
download_name=f'{table}.csv',
as_attachment=True,
)
if __name__ == '__main__':
app.run(host='localhost', port=5011, debug=True)
答:
0赞
roboes
11/13/2023
#1
我找到了解决方案:使用 Flask 的 ,我将请求参数(列过滤器)存储在当前会话中,并将它们作为对象检索,以便在请求下载后使用应用的过滤器下载.csv。最终代码可以在下面找到:flask.session
werkzeug.ImmutableMultiDict
/templates/index.html
<!DOCTYPE html>
<html>
<head>
<title>Flask Application</title>
<link rel="stylesheet" href="https://cdn.datatables.net/1.13.7/css/jquery.dataTables.css"/>
<link rel="stylesheet" href="https://cdn.datatables.net/1.13.7/css/dataTables.bootstrap5.min.css"/>
<style>
.text-wrap{
white-space:normal;
}
.width-200{
width:200px;
}
</style>
</head>
<body>
<h1>Table Viewer</h1>
<form id="selector" action="{{ url_for('.database_download') }}" method="POST" enctype="multipart/form-data">
Select table:
<select id="table-selector" name="table-selector">
<!-- <option disabled selected value>Select a Dataset</option> -->
<option value="table_a" selected>Table A</option>
<option value="table_b">Table B</option>
</select>
</form>
<br>
<p>
<input type="checkbox" id="order-columns" value="True" checked> Order columns by date</p>
<p><input type="button" id="button-database-selector" value="Select Table" /></p>
<br>
<br>
<br>
<div id="content-a" class="table table-striped table-hover" style="display:none; width:100%; table-layout:fixed; overflow-x:auto;">
<table id="table-a">
<thead>
<tr>
<th></th>
<th></th>
</tr>
<tr>
<th></th>
<th></th>
</tr>
</thead>
</table>
</div>
<div id="content-b" class="table table-striped table-hover" style="display:none; width:100%; table-layout:fixed; overflow-x:auto;">
<table id="table-b">
<thead>
<tr>
<th></th>
<th></th>
</tr>
<tr>
<th></th>
<th></th>
</tr>
</thead>
</table>
</div>
<div id="download" style="display:none">
<p>Download table: <input type="submit" value="Download" form="selector"></button></p>
</div>
<script type="text/javascript" src="https://code.jquery.com/jquery-3.7.0.js"></script>
<script type="text/javascript" src="https://cdn.datatables.net/1.13.7/js/jquery.dataTables.min.js"></script>
<script type="text/javascript" src="https://cdn.datatables.net/1.13.7/js/dataTables.bootstrap5.min.js"></script>
<script type="text/javascript">
$(document).ready( function() {
$('input[id="button-database-selector"]').click( function() {
if ($('input[id="order-columns"]').prop('checked')) {
var order_columns = 'True';
} else {
var order_columns = '';
}
var database_selected = $('select[id="table-selector"]').val();
// Display download button
$('div[id="download"]').css('display', 'block');
if (database_selected == 'table_a') {
// Hide all tables
$('div[id^="content-"]').hide();
// Display table
$('div[id="content-a"]').css('display', 'block');
// Setup - add a text input to each header cell
$('#table-a thead tr:eq(1) th').each( function () {
var title = $('#table-a thead tr:eq(0) th').eq( $(this).index() ).text();
$(this).html( '<input type="text" placeholder="Search '+title+'" />' );
} );
var table = $('table[id="table-a"]').DataTable({
'pageLength': 50,
'ordering': false,
// 'scrollX': true,
// 'sScrollX': '100%',
'searching': true,
'dom': 'lrtip',
'orderCellsTop': true,
'serverSide': true,
'bDestroy': true,
'ajax': {
'url': "{{ url_for('.table_a') }}",
'type': 'GET',
'data': {
'orderColumns': order_columns,
},
},
'columns': [
{ 'title': 'Column Date', 'data': 'column_date' },
{ 'title': 'Column A', 'data': 'column_a' },
],
'columnDefs': [
{
'render': function (data, type, full, meta) {
return "<div class='text-wrap width-200'>" + data + "</div>";
},
'defaultContent': '',
'targets': '_all',
}
],
});
// Apply the search
table.columns().every( function (index) {
$('#table-a thead tr:eq(1) th:eq(' + index + ') input').on('keydown', function (event) {
if (event.keyCode == 13) {
table.column($(this).parent().index() + ':visible')
.search(this.value)
.draw();
}
});
});
} else if (database_selected === 'table_b') {
// Hide all tables
$('div[id^="content-"]').hide();
// Display table
$('div[id="content-b"]').css('display', 'block');
// Setup - add a text input to each header cell
$('#table-b thead tr:eq(1) th').each( function () {
var title = $('#table-b thead tr:eq(0) th').eq( $(this).index() ).text();
$(this).html( '<input type="text" placeholder="Search '+title+'" />' );
} );
var table = $('table[id="table-b"]').DataTable({
'pageLength': 50,
'ordering': false,
// 'scrollX': true,
// 'sScrollX': '100%',
'searching': true,
'dom': 'lrtip',
'orderCellsTop': true,
'serverSide': true,
'bDestroy': true,
'ajax': {
'url': "{{ url_for('.table_b') }}",
'type': 'GET',
'data': {
'orderColumns': order_columns,
},
},
'columns': [
{ 'title': 'Column Date', 'data': 'column_date' },
{ 'title': 'Column B', 'data': 'column_b' },
],
'columnDefs': [
{
'render': function (data, type, full, meta) {
return "<div class='text-wrap width-200'>" + data + "</div>";
},
'defaultContent': '',
'targets': '_all',
}
],
});
// Apply the search
table.columns().every( function (index) {
$('#table-b thead tr:eq(1) th:eq(' + index + ') input').on('keydown', function (event) {
if (event.keyCode == 13) {
table.column($(this).parent().index() + ':visible')
.search(this.value)
.draw();
}
});
});
}
});
});
</script>
</body>
</html>
/app.py
# Import packages
from datetime import date
from io import BytesIO
import secrets
from fastapi.encoders import jsonable_encoder
from flask import Flask, render_template, request, jsonify, send_file, session
import pandas as pd
from sqlalchemy import create_engine, Column, DateTime, Integer, String
from sqlalchemy.orm import declarative_base, sessionmaker
from werkzeug.datastructures import ImmutableMultiDict
app = Flask(__name__)
# Create a SQLite database engine
engine = create_engine(
url='sqlite:///test.db',
echo=False,
)
# Create a session
Session = sessionmaker(bind=engine)
session_db = Session()
# Create a base class for declarative models
Base = declarative_base()
# Define "table_a" model
class table_a(Base):
__tablename__ = 'table_a'
id = Column(Integer, primary_key=True)
column_date = Column(DateTime)
column_a = Column(String)
# Define "table_b" model
class table_b(Base):
__tablename__ = 'table_b'
id = Column(Integer, primary_key=True)
column_date = Column(DateTime)
column_b = Column(String)
with app.app_context():
Base.metadata.drop_all(bind=engine)
Base.metadata.create_all(bind=engine)
# Add sample data to the "table_a" table
session_db.add_all(
[
table_a(column_date=date(2023, 11, 10), column_a='1'),
table_a(column_date=date(2023, 10, 31), column_a='1'),
table_a(column_date=date(2023, 10, 31), column_a='2'),
],
)
# Add sample data to the "table_b" table
session_db.add_all(
[
table_b(column_date=date(2023, 11, 1), column_b='1'),
table_b(column_date=date(2023, 11, 11), column_b='1'),
],
)
session_db.commit()
def read_table(*, table_name, request_dict):
# Read a SQLite database engine
engine = create_engine(
url='sqlite:///test.db',
echo=False,
)
# Create a session_db
Session = sessionmaker(bind=engine)
session_db = Session()
# Create a base class for declarative models
Base = declarative_base()
# Define "table_a" model
class table_a(Base):
__tablename__ = 'table_a'
id = Column(Integer, primary_key=True)
column_date = Column(DateTime)
column_a = Column(String)
# Define "table_b" model
class table_b(Base):
__tablename__ = 'table_b'
id = Column(Integer, primary_key=True)
column_date = Column(DateTime)
column_b = Column(String)
if table_name == 'table_a':
sql_query = session_db.query(table_a)
# Filters
filter_column_date = request_dict.get(
key='columns[0][search][value]',
default='',
type=str,
)
filter_column_a = request_dict.get(
key='columns[1][search][value]',
default='',
type=str,
)
# Other parameters
order_columns = request_dict.get(key='orderColumns', default='', type=bool)
if filter_column_date != '':
sql_query = sql_query.filter(
table_a.column_date.ilike(
f'%{filter_column_date}%',
),
)
if filter_column_a != '':
sql_query = sql_query.filter(
table_a.column_a.ilike(
f'%{filter_column_a}%',
),
)
if order_columns is True:
sql_query = sql_query.order_by(table_a.column_date.asc())
if table_name == 'table_b':
sql_query = session_db.query(table_b)
# Filters
filter_column_date = request_dict.get(
key='columns[0][search][value]',
default='',
type=str,
)
filter_column_b = request_dict.get(
key='columns[1][search][value]',
default='',
type=str,
)
# Other parameters
order_columns = request_dict.get(key='orderColumns', default='', type=bool)
if filter_column_date != '':
sql_query = sql_query.filter(
table_b.column_date.ilike(
f'%{filter_column_date}%',
),
)
if filter_column_b != '':
sql_query = sql_query.filter(
table_b.column_b.ilike(
f'%{filter_column_b}%',
),
)
if order_columns is True:
sql_query = sql_query.order_by(table_b.column_date.asc())
return sql_query
@app.route('/')
def home():
return render_template(template_name_or_list='index.html')
@app.route(rule='/table-a', methods=['GET'])
def table_a():
# Get request arguments from the request
request_dict = request.args
# Store request arguments in current session
session['REQUEST_DICT'] = request_dict
# Get the pagination arguments from the request
start = request_dict.get(key='start', default=0, type=int)
length = request_dict.get(key='length', default=10, type=int)
sql_query = read_table(table_name='table_a', request_dict=request_dict)
response = {
'draw': request_dict.get(key='draw', default=1, type=int),
'recordsTotal': sql_query.count(),
'recordsFiltered': sql_query.count(),
'data': jsonable_encoder(obj=sql_query.offset(start).limit(length).all(), exclude_none=False, sqlalchemy_safe=True),
}
return jsonify(response)
@app.route(rule='/table-b', methods=['GET'])
def table_b():
# Get request arguments from the request
request_dict = request.args
# Store request arguments in current session
session['REQUEST_DICT'] = request_dict
# Get the pagination arguments from the request
start = request_dict.get(key='start', default=0, type=int)
length = request_dict.get(key='length', default=10, type=int)
sql_query = read_table(table_name='table_b', request_dict=request_dict)
response = {
'draw': request_dict.get(key='draw', default=1, type=int),
'recordsTotal': sql_query.count(),
'recordsFiltered': sql_query.count(),
'data': jsonable_encoder(obj=sql_query.offset(start).limit(length).all(), exclude_none=False, sqlalchemy_safe=True),
}
return jsonify(response)
@app.route(rule='/database-download', methods=['GET', 'POST'])
def database_download():
if request.method == 'POST':
table = request.form.get('table-selector')
request_dict = ImmutableMultiDict(session['REQUEST_DICT'])
sql_query = read_table(table_name=f'{table}', request_dict=request_dict)
# Create a binary data memory file
buffer = BytesIO()
df = pd.read_sql(
sql=sql_query.statement,
con=create_engine(
url='sqlite:///test.db',
echo=False,
),
index_col=None,
coerce_float=True,
params=None,
parse_dates=None,
chunksize=None,
)
# DataFrame to buffer
df.to_csv(
path_or_buf=buffer,
sep=',',
na_rep='',
header=True,
index=False,
index_label=None,
encoding='utf-8-sig',
)
# Change the stream position to the start of the stream
buffer.seek(0)
return send_file(
path_or_file=buffer,
download_name=f'{table}.csv',
as_attachment=True,
)
if __name__ == '__main__':
app.secret_key = secrets.token_urlsafe(nbytes=16)
app.run(host='localhost', port=5011, debug=True)
评论