Django + gevent + psycopg2 download whole PostgreSQL table as CSV
import csv | |
import logging | |
import gevent | |
from psycopg2 import ( | |
connect, | |
sql, | |
) | |
from django.conf import ( | |
settings, | |
) | |
from django.http import ( | |
StreamingHttpResponse, | |
) | |
logger = logging.getLogger() | |
def csv_view(request, schema, table): | |
''' Returns a StreamingHttpResponse that contains a CSV of an entire database table | |
Copyright (c) 2019 Department for International Trade. All rights reserved | |
This work is licensed under the terms of the MIT license. | |
For a copy, see https://opensource.org/licenses/MIT. | |
''' | |
# Chosen by experimentation for our case. | |
# You may find better values for yours | |
cursor_itersize = 1000 | |
queue_size = 5 | |
bytes_queue = gevent.queue.Queue(maxsize=queue_size) | |
def put_db_rows_to_queue(): | |
# The csv writer "writes" its output by calling a file-like object | |
# with a `write` method. | |
class PseudoBuffer: | |
def write(self, value): | |
return value | |
csv_writer = csv.writer(PseudoBuffer()) | |
with \ | |
connect(settings.DATABASE_DSN) as conn, \ | |
conn.cursor(name='all_table_data') as cur: # Named cursor => server-side cursor | |
cur.itersize = cursor_itersize | |
cur.arraysize = cursor_itersize | |
# There is no ordering here. We just want a full dump. | |
# Allows concurrent SELECT, INSERT, CREAT INDEX + CREATE_ TRIGGER, but | |
# will block ALTER TABLE, DROP TABLE, TRUNCATE, VACUUM FULL | |
cur.execute(sql.SQL(""" | |
SELECT | |
* | |
FROM | |
{}.{} | |
""").format(sql.Identifier(schema), sql.Identifier(table))) | |
i = 0 | |
while True: | |
rows = cur.fetchmany(cursor_itersize) | |
if i == 0: | |
# Column names are not populated until the first row fetched | |
bytes_queue.put(csv_writer.writerow([ | |
column_desc[0] for column_desc in cur.description | |
]), timeout=10) | |
bytes_fetched = ''.join( | |
csv_writer.writerow(row) for row in rows | |
).encode('utf-8') | |
bytes_queue.put(bytes_fetched, timeout=15) | |
i += len(rows) | |
if not rows: | |
break | |
bytes_queue.put(csv_writer.writerow(['Number of rows: ' + str(i)])) | |
def yield_bytes_from_queue(): | |
while put_db_rows_to_queue_job: | |
try: | |
# There will be a 0.1 second wait after the end of the data | |
# from the db to when the connection is closed. Might be able | |
# to avoid this, but KISS, and minor | |
yield bytes_queue.get(timeout=0.1) | |
except gevent.queue.Empty: | |
pass | |
def handle_exception(job): | |
logger.exception(job.exception) | |
put_db_rows_to_queue_job = gevent.spawn(put_db_rows_to_queue) | |
put_db_rows_to_queue_job.link_exception(handle_exception) | |
response = StreamingHttpResponse(yield_bytes_from_queue(), content_type='text/csv') | |
response['Content-Disposition'] = f'attachment; filename="{schema}_{table}.csv"' | |
return response |
This comment has been minimized.
This comment has been minimized.
That is really neat! One thing I would point out is to make sure to sanitize the inputs table and schema, because they are passed directly into the SQL query they would be easy targets for a SQL injection if exposed to end users. I personally would add the validation within this view itself. |
This comment has been minimized.
This comment has been minimized.
@philipkiely I believe they are already being escaped, which forbids SQL injection. The wrapping of the inputs by cur.execute(sql.SQL("""
SELECT
*
FROM
{}.{}
""").format(sql.Identifier(schema), sql.Identifier(table))) This is very similar to the examples in the psycopg2 documentation at http://initd.org/psycopg/docs/sql.html |
This comment has been minimized.
This comment has been minimized.
My mistake @michalc, thanks for pointing that out! |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This comment has been minimized.
Usage notes
This view function would only make sense if Django is running via gevent, for example from a gevent worker created by gunicorn
If behind nginx, you might need the setting,
to avoid hitting timeouts if the client doesn't fetch data as fast as the intermediate queue is being filled from the database