Skip to content

Instantly share code, notes, and snippets.

@hakib
Last active January 24, 2024 21:38
Show Gist options
  • Save hakib/7e723d2c113b947f7920bf55737e4d16 to your computer and use it in GitHub Desktop.
Save hakib/7e723d2c113b947f7920bf55737e4d16 to your computer and use it in GitHub Desktop.
# https://hakibenita.com/fast-load-data-python-postgresql
from typing import Iterator, Dict, Any, Optional
from urllib.parse import urlencode
import datetime
#------------------------ Profile
import time
from functools import wraps
from memory_profiler import memory_usage
def profile(fn):
@wraps(fn)
def inner(*args, **kwargs):
fn_kwargs_str = ', '.join(f'{k}={v}' for k, v in kwargs.items())
print(f'\n{fn.__name__}({fn_kwargs_str})')
# Measure time
t = time.perf_counter()
retval = fn(*args, **kwargs)
elapsed = time.perf_counter() - t
print(f'Time {elapsed:0.4}')
# Measure memory
mem, retval = memory_usage((fn, args, kwargs), retval=True, timeout=200, interval=1e-7)
print(f'Memory {max(mem) - min(mem)}')
return retval
return inner
#------------------------ Data
import requests
def iter_beers_from_api(page_size: int = 25) -> Iterator[Dict[str, Any]]:
session = requests.Session()
page = 1
while True:
response = session.get('https://api.punkapi.com/v2/beers?' + urlencode({
'page': page,
'per_page': page_size
}))
response.raise_for_status()
data = response.json()
if not data:
break
for beer in data:
yield beer
page += 1
def iter_beers_from_file(path: str) -> Iterator[Dict[str, Any]]:
import json
with open(path, 'r') as f:
data = json.load(f)
for beer in data:
yield beer
#------------------------ Load
def create_staging_table(cursor):
cursor.execute("""
DROP TABLE IF EXISTS staging_beers;
CREATE TABLE staging_beers (
id INTEGER,
name TEXT,
tagline TEXT,
first_brewed DATE,
description TEXT,
image_url TEXT,
abv DECIMAL,
ibu DECIMAL,
target_fg DECIMAL,
target_og DECIMAL,
ebc DECIMAL,
srm DECIMAL,
ph DECIMAL,
attenuation_level DECIMAL,
brewers_tips TEXT,
contributed_by TEXT,
volume INTEGER
);
""")
def parse_first_brewed(text: str) -> datetime.date:
parts = text.split('/')
if len(parts) == 2:
return datetime.date(int(parts[1]), int(parts[0]), 1)
elif len(parts) == 1:
return datetime.date(int(parts[0]), 1, 1)
else:
assert False, 'Unknown date format'
@profile
def insert_one_by_one(connection, beers: Iterator[Dict[str, Any]]) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
for beer in beers:
cursor.execute("""
INSERT INTO staging_beers VALUES (
%(id)s,
%(name)s,
%(tagline)s,
%(first_brewed)s,
%(description)s,
%(image_url)s,
%(abv)s,
%(ibu)s,
%(target_fg)s,
%(target_og)s,
%(ebc)s,
%(srm)s,
%(ph)s,
%(attenuation_level)s,
%(brewers_tips)s,
%(contributed_by)s,
%(volume)s
);
""", {
**beer,
'first_brewed': parse_first_brewed(beer['first_brewed']),
'volume': beer['volume']['value'],
})
# http://initd.org/psycopg/docs/cursor.html#cursor.executemany
@profile
def insert_executemany(connection, beers: Iterator[Dict[str, Any]]) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
all_beers = [{
**beer,
'first_brewed': parse_first_brewed(beer['first_brewed']),
'volume': beer['volume']['value'],
} for beer in beers]
cursor.executemany("""
INSERT INTO staging_beers VALUES (
%(id)s,
%(name)s,
%(tagline)s,
%(first_brewed)s,
%(description)s,
%(image_url)s,
%(abv)s,
%(ibu)s,
%(target_fg)s,
%(target_og)s,
%(ebc)s,
%(srm)s,
%(ph)s,
%(attenuation_level)s,
%(brewers_tips)s,
%(contributed_by)s,
%(volume)s
);
""", all_beers)
@profile
def insert_executemany_iterator(connection, beers: Iterator[Dict[str, Any]]) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
cursor.executemany("""
INSERT INTO staging_beers VALUES (
%(id)s,
%(name)s,
%(tagline)s,
%(first_brewed)s,
%(description)s,
%(image_url)s,
%(abv)s,
%(ibu)s,
%(target_fg)s,
%(target_og)s,
%(ebc)s,
%(srm)s,
%(ph)s,
%(attenuation_level)s,
%(brewers_tips)s,
%(contributed_by)s,
%(volume)s
);
""", ({
**beer,
'first_brewed': parse_first_brewed(beer['first_brewed']),
'volume': beer['volume']['value'],
} for beer in beers))
# http://initd.org/psycopg/docs/extras.html#psycopg2.extras.execute_batch
import psycopg2.extras
@profile
def insert_execute_batch(connection, beers: Iterator[Dict[str, Any]], page_size: int = 100) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
all_beers = [{
**beer,
'first_brewed': parse_first_brewed(beer['first_brewed']),
'volume': beer['volume']['value'],
} for beer in beers]
psycopg2.extras.execute_batch(cursor, """
INSERT INTO staging_beers VALUES (
%(id)s,
%(name)s,
%(tagline)s,
%(first_brewed)s,
%(description)s,
%(image_url)s,
%(abv)s,
%(ibu)s,
%(target_fg)s,
%(target_og)s,
%(ebc)s,
%(srm)s,
%(ph)s,
%(attenuation_level)s,
%(brewers_tips)s,
%(contributed_by)s,
%(volume)s
);
""", all_beers, page_size=page_size)
@profile
def insert_execute_batch_iterator(connection, beers: Iterator[Dict[str, Any]], page_size: int = 100) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
iter_beers = ({
**beer,
'first_brewed': parse_first_brewed(beer['first_brewed']),
'volume': beer['volume']['value'],
} for beer in beers)
psycopg2.extras.execute_batch(cursor, """
INSERT INTO staging_beers VALUES (
%(id)s,
%(name)s,
%(tagline)s,
%(first_brewed)s,
%(description)s,
%(image_url)s,
%(abv)s,
%(ibu)s,
%(target_fg)s,
%(target_og)s,
%(ebc)s,
%(srm)s,
%(ph)s,
%(attenuation_level)s,
%(brewers_tips)s,
%(contributed_by)s,
%(volume)s
);
""", iter_beers, page_size=page_size)
# http://initd.org/psycopg/docs/extras.html#psycopg2.extras.execute_values
import psycopg2.extras
@profile
def insert_execute_values(connection, beers: Iterator[Dict[str, Any]]) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
psycopg2.extras.execute_values(cursor, """
INSERT INTO staging_beers VALUES %s;
""", [(
beer['id'],
beer['name'],
beer['tagline'],
parse_first_brewed(beer['first_brewed']),
beer['description'],
beer['image_url'],
beer['abv'],
beer['ibu'],
beer['target_fg'],
beer['target_og'],
beer['ebc'],
beer['srm'],
beer['ph'],
beer['attenuation_level'],
beer['brewers_tips'],
beer['contributed_by'],
beer['volume']['value'],
) for beer in beers])
@profile
def insert_execute_values_iterator(connection, beers: Iterator[Dict[str, Any]], page_size: int = 100) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
psycopg2.extras.execute_values(cursor, """
INSERT INTO staging_beers VALUES %s;
""", ((
beer['id'],
beer['name'],
beer['tagline'],
parse_first_brewed(beer['first_brewed']),
beer['description'],
beer['image_url'],
beer['abv'],
beer['ibu'],
beer['target_fg'],
beer['target_og'],
beer['ebc'],
beer['srm'],
beer['ph'],
beer['attenuation_level'],
beer['brewers_tips'],
beer['contributed_by'],
beer['volume']['value'],
) for beer in beers), page_size=page_size)
# http://initd.org/psycopg/docs/cursor.html#cursor.copy_from
# https://docs.python.org/3.7/library/io.html?io.StringIO#io.StringIO
import io
def clean_csv_value(value: Optional[Any]) -> str:
if value is None:
return r'\N'
return str(value).replace('\n', '\\n')
@profile
def copy_stringio(connection, beers: Iterator[Dict[str, Any]]) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
csv_file_like_object = io.StringIO()
for beer in beers:
csv_file_like_object.write('|'.join(map(clean_csv_value, (
beer['id'],
beer['name'],
beer['tagline'],
parse_first_brewed(beer['first_brewed']),
beer['description'],
beer['image_url'],
beer['abv'],
beer['ibu'],
beer['target_fg'],
beer['target_og'],
beer['ebc'],
beer['srm'],
beer['ph'],
beer['attenuation_level'],
beer['contributed_by'],
beer['brewers_tips'],
beer['volume']['value'],
))) + '\n')
csv_file_like_object.seek(0)
cursor.copy_from(csv_file_like_object, 'staging_beers', sep='|')
class StringIteratorIO(io.TextIOBase):
def __init__(self, iter: Iterator[str]):
self._iter = iter
self._buff = ''
def readable(self) -> bool:
return True
def _read1(self, n: Optional[int] = None) -> str:
while not self._buff:
try:
self._buff = next(self._iter)
except StopIteration:
break
ret = self._buff[:n]
self._buff = self._buff[len(ret):]
return ret
def read(self, n: Optional[int] = None) -> str:
line = []
if n is None or n < 0:
while True:
m = self._read1()
if not m:
break
line.append(m)
else:
while n > 0:
m = self._read1(n)
if not m:
break
n -= len(m)
line.append(m)
return ''.join(line)
@profile
def copy_string_iterator(connection, beers: Iterator[Dict[str, Any]], size: int = 8192) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
beers_string_iterator = StringIteratorIO((
'|'.join(map(clean_csv_value, (
beer['id'],
beer['name'],
beer['tagline'],
parse_first_brewed(beer['first_brewed']).isoformat(),
beer['description'],
beer['image_url'],
beer['abv'],
beer['ibu'],
beer['target_fg'],
beer['target_og'],
beer['ebc'],
beer['srm'],
beer['ph'],
beer['attenuation_level'],
beer['brewers_tips'],
beer['contributed_by'],
beer['volume']['value'],
))) + '\n'
for beer in beers
))
cursor.copy_from(beers_string_iterator, 'staging_beers', sep='|', size=size)
#------------------------ Benchmark
connection = psycopg2.connect(
host='localhost',
database='testload',
user='haki',
password=None,
)
connection.set_session(autocommit=True)
import psycopg2.extras
def test(connection, n: int):
# Make sure the data was loaded
with connection.cursor(cursor_factory=psycopg2.extras.NamedTupleCursor) as cursor:
# Test number of rows.
cursor.execute('SELECT COUNT(*) AS cnt FROM staging_beers')
record = cursor.fetchone()
assert record.cnt == n, f'Expected {n} rows, got {record.cnt} rows!'
# Test that the data was loaded, and that transformations were applied correctly.
cursor.execute("""
SELECT DISTINCT ON (id)
*
FROM
staging_beers
WHERE
id IN (1, 235)
ORDER BY
id;
""")
beer_1 = cursor.fetchone()
assert beer_1.name == 'Buzz'
assert beer_1.first_brewed == datetime.date(2007, 9, 1)
assert beer_1.volume == 20
beer_235 = cursor.fetchone()
assert beer_235.name == 'Mango And Chili Barley Wine'
assert beer_235.first_brewed == datetime.date(2016, 1, 1)
assert beer_235.volume == 20
beers = list(iter_beers_from_api()) * 100
insert_one_by_one(connection, beers)
test(connection, len(beers))
insert_executemany(connection, beers)
test(connection, len(beers))
insert_executemany_iterator(connection, beers)
test(connection, len(beers))
insert_execute_batch(connection, beers)
test(connection, len(beers))
insert_execute_batch_iterator(connection, beers, page_size=1)
test(connection, len(beers))
insert_execute_batch_iterator(connection, beers, page_size=100)
test(connection, len(beers))
insert_execute_batch_iterator(connection, beers, page_size=1000)
test(connection, len(beers))
insert_execute_batch_iterator(connection, beers, page_size=10000)
test(connection, len(beers))
insert_execute_values(connection, beers)
test(connection, len(beers))
insert_execute_values_iterator(connection, beers, page_size=1)
test(connection, len(beers))
insert_execute_values_iterator(connection, beers, page_size=100)
test(connection, len(beers))
insert_execute_values_iterator(connection, beers, page_size=1000)
test(connection, len(beers))
insert_execute_values_iterator(connection, beers, page_size=10000)
test(connection, len(beers))
copy_stringio(connection, beers)
test(connection, len(beers))
copy_string_iterator(connection, beers, size=1024)
test(connection, len(beers))
copy_string_iterator(connection, beers, size=1024 * 8)
test(connection, len(beers))
copy_string_iterator(connection, beers, size=1024 * 16)
test(connection, len(beers))
copy_string_iterator(connection, beers, size=1024 * 64)
test(connection, len(beers))
@Tiwo1991
Copy link

Hi I'm trying to slightly modify and apply your def copy_string_iterator function, since I need to filter the generated lines. I use itertools.filterfalse in the copy_string_iterator function code block:

))) + '\n') for row in filterfalse(lambda line: "#" in line.get('date'), log_lines) )

This results in an error when running the cursor.copy_from:

QueryCanceled: COPY from stdin failed: error in .read() call CONTEXT: COPY log_table, line 112910

My test file only has 112909 lines that match the filterfalse condition. Why does it try to copy line 112910?

Thank you for any assistance.

@hakib
Copy link
Author

hakib commented Mar 17, 2020

Hey @Tiwo1991, hard to tell.

I think the best way to check this is to dump the contents of your "clean" file into an actual file, and then try to inspect it, even to load it into the database. This way you'll be able to see exactly where the problem is.

@Tiwo1991
Copy link

@hakib, Thank you for the quick response. I dumped the contents into a text file in the following manner:

with open(r"text.txt","w") as file: w = csv.writer(file) for row in filterfalse(lambda line: "#" in line.get('date'), log_lines): json.dump(row,file) file.write('\n')

The last line in the txt file is an empty line. I expect it to be the culprit. If so I need to find a way for it to be skipped by copy_row() somehow.

@hakib
Copy link
Author

hakib commented Mar 17, 2020

@Tiwo1991 glad you found the problem. I think that instead of finding a way to skip this line it's best to look for a way to not write it in the first place (check if current line is last line and not write \n?)

@ededdneddyfan
Copy link

Just wanted to comment w/ a shoutout on this amazingly well explained and researched example. Great work @hakib

@hakib
Copy link
Author

hakib commented May 7, 2020

@ededdneddyfan Thanks :)

@onofre-marco
Copy link

Hi there, before my comment I will join the rest of the members congratulating @hakib. Great and useful job.

My question is more of a doubt. What is the difference between insert_execute_batch and insert_execute_batch_iterator??
The only difference I found is changing all_beers to iter_beers.

Best regards

@hakib
Copy link
Author

hakib commented Jan 20, 2021

It's subtle difference indeed. The function insert_execute_batch first evaluates the entire list:

 all_beers = [{ ... }]

Notice how all_beers is a list, not a generator.

The function insert_execute_batch_iterator is using a generator instead:

iter_beers = ({ ... })

Notice that iter_beers is using a round brackets, so the iterator is not evaluated immediately which should reduce the memory footprint.

@onofre-marco
Copy link

Thank you for the quick reply. I actually missed that.
Now with your post I'm learning how to optimize insert and some Python orthography at the same time :)

@Pkoiralap
Copy link

This is wonderful. I am eternally grateful for the efforts you have put in.

I am however facing a problem.

I am copying a 25MB-ish CSV file from an s3 bucket to a Postgres database. Everything in the code is the same except for the part where I convert the byte data (that comes from s3 response [StreamingResponseBody]) to a utf-8 string data inside the _read1() function. And the copy is extremely slow. The 25mb-ish file takes more than 2 secs to copy.

def _read1(self, n=None):
    while not self._buff:
        try:
            self._buff = next(self._iter)
            self._buff = self._buff.decode("utf-8")  #  <--------this line here
        except StopIteration:
            break
    ret = self._buff[:n]
    self._buff = self._buff[len(ret):]
    return ret

Do you think this is causing the issue? Also is there an alternative to what I am using?

@hakib
Copy link
Author

hakib commented Jan 22, 2021

Try using TextIOWrapper.

@anandrajakrishnan
Copy link

I used the StringIteratorIO to load data from csv into postgreSQL using the copy_from method available in psycopg2. It is quite fast. With my infrastructure, I was able to load 32 million rows in less than 8 minutes. That was quite impressive. The only observation that I had with it was the buffer/cache went up and it remained high even after the program completed execution. Any suggestion to avoid it?

@hale4029
Copy link

hale4029 commented May 5, 2022

This is amazing. Thank you. The table parameter in copy_from does not accept 'schema.table' reference, therefore I had to use copy_expert. The code below works -- I did not test for memory and speed. The dataframe columns/data/dtypes match the table so I do not specify columns clearly.

def insert_with_string_io(df: pd.DataFrame):
        buffer = io.StringIO()
        df.to_csv(buffer, index=False, header=False)
        buffer.seek(0)
        with conn.cursor() as cursor:
            try:
                cursor.copy_expert(f"COPY <database>.<schema>.<table> FROM STDIN (FORMAT 'csv', HEADER false)" , buffer)
            except (Exception, psycopg2.DatabaseError) as error:
                print("Error: %s" % error)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment