Skip to content

Instantly share code, notes, and snippets.

@evan-burke
Last active April 15, 2020 22:02
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save evan-burke/53f990c6a68ccabc91eefd71b5483441 to your computer and use it in GitHub Desktop.
Save evan-burke/53f990c6a68ccabc91eefd71b5483441 to your computer and use it in GitHub Desktop.
psycopg2 execute_values wrapper for accurate row counts
# One of the fastest ways to insert bulk data into Postgres (at least, aside from COPY) is using the psycopg2 extras function execute_values.
# However, this doesn't return an accurate row count value - instead, it just returns the row count for the last page inserted.
# This wraps the execute_values function with its own pagination to return an accurate count of rows inserted.
# Performance is approximately equivalent to underlying execute_values function - within 5-10% or so in my brief tests.
import psycopg2
import psycopg2.extras
import math
db_connection_string = "dbname=EDITME host=EDITME"
def query_function(data, cur, page_size=500):
"""Runs execute_values function. Update with your values and query template for 'data' input.
'data' should be a list of dicts. Map dict keys to insert columns, if needed, in query_template."""
insert_query = """insert into table_name (column1, column2) values %s"""
query_template = """( %(column1)s, %(column2)s )"""
# could probably support other sequences here too
if type(data) is not list:
print("Error: query_function() requires 'data' input be a list of dicts in order to perfom the insert correctly.")
return 0
psycopg2.extras.execute_values(cur, insert_query, data,
template=query_template, page_size=page_size)
rowcount = cur.rowcount
return rowcount
def insert_paginator(data, page_size=500, debug=0):
""" Psycopg2's execute_values method doesn't retain accurate insert row counts if data length > page size.
This wraps an execute_values query function to get accurate counts."""
with psycopg2.connect(db_connection_string) as conn:
with conn.cursor() as cur:
if len(data) > page_size:
# we'll need to do pagination and calculate our own # of rows inserted.
pages_needed = math.ceil(len(data) / page_size)
if debug:
print("we'll need to do", pages_needed, "page inserts of",
page_size, "values each to get through the", len(data), "input values")
detail_rows_inserted = [] # source and inserted counts by page.
rows_inserted = 0
for i in range(pages_needed):
page_num = i
start_row = i * page_size
if page_num < (pages_needed - 1):
end_row = ((i+1) * page_size)
else:
# last page.
end_row = len(data)
page_rows = query_function(data[start_row:end_row], page_size=page_size, cur=cur)
rows_inserted += page_rows
if debug:
detail_dict = { "page": page_num,
"start row number": start_row,
"source row count": len(data[start_row:end_row]),
"inserted row count": page_rows }
print(detail_dict)
detail_rows_inserted.append(detail_dict)
else:
rows_inserted = query_function(data, page_size=page_size, cur=cur)
if debug:
return rows_inserted, detail_rows_inserted
else:
return rows_inserted
@billyfung
Copy link

It's interesting that the row count isn't returned properly, I might need to look into psycopg2 to figure out why

@evan-burke
Copy link
Author

It is intentional for the execute_values method for some reasons discussed briefly here (and in similar github issues): psycopg/psycopg2#540
Still, it's a bit frustrating as a user. The execute_values method is relatively new, having been released in Feb 2017 - so I think they will end up fixing that behavior at some point.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment