Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
# https://hakibenita.com/fast-load-data-python-postgresql
from typing import Iterator, Dict, Any, Optional
from urllib.parse import urlencode
import datetime
#------------------------ Profile
import time
from functools import wraps
from memory_profiler import memory_usage
def profile(fn):
@wraps(fn)
def inner(*args, **kwargs):
fn_kwargs_str = ', '.join(f'{k}={v}' for k, v in kwargs.items())
print(f'\n{fn.__name__}({fn_kwargs_str})')
# Measure time
t = time.perf_counter()
retval = fn(*args, **kwargs)
elapsed = time.perf_counter() - t
print(f'Time {elapsed:0.4}')
# Measure memory
mem, retval = memory_usage((fn, args, kwargs), retval=True, timeout=200, interval=1e-7)
print(f'Memory {max(mem) - min(mem)}')
return retval
return inner
#------------------------ Data
import requests
def iter_beers_from_api(page_size: int = 25) -> Iterator[Dict[str, Any]]:
session = requests.Session()
page = 1
while True:
response = session.get('https://api.punkapi.com/v2/beers?' + urlencode({
'page': page,
'per_page': page_size
}))
response.raise_for_status()
data = response.json()
if not data:
break
for beer in data:
yield beer
page += 1
def iter_beers_from_file(path: str) -> Iterator[Dict[str, Any]]:
import json
with open(path, 'r') as f:
data = json.load(f)
for beer in data:
yield beer
#------------------------ Load
def create_staging_table(cursor):
cursor.execute("""
DROP TABLE IF EXISTS staging_beers;
CREATE TABLE staging_beers (
id INTEGER,
name TEXT,
tagline TEXT,
first_brewed DATE,
description TEXT,
image_url TEXT,
abv DECIMAL,
ibu DECIMAL,
target_fg DECIMAL,
target_og DECIMAL,
ebc DECIMAL,
srm DECIMAL,
ph DECIMAL,
attenuation_level DECIMAL,
brewers_tips TEXT,
contributed_by TEXT,
volume INTEGER
);
""")
def parse_first_brewed(text: str) -> datetime.date:
parts = text.split('/')
if len(parts) == 2:
return datetime.date(int(parts[1]), int(parts[0]), 1)
elif len(parts) == 1:
return datetime.date(int(parts[0]), 1, 1)
else:
assert False, 'Unknown date format'
@profile
def insert_one_by_one(connection, beers: Iterator[Dict[str, Any]]) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
for beer in beers:
cursor.execute("""
INSERT INTO staging_beers VALUES (
%(id)s,
%(name)s,
%(tagline)s,
%(first_brewed)s,
%(description)s,
%(image_url)s,
%(abv)s,
%(ibu)s,
%(target_fg)s,
%(target_og)s,
%(ebc)s,
%(srm)s,
%(ph)s,
%(attenuation_level)s,
%(brewers_tips)s,
%(contributed_by)s,
%(volume)s
);
""", {
**beer,
'first_brewed': parse_first_brewed(beer['first_brewed']),
'volume': beer['volume']['value'],
})
# http://initd.org/psycopg/docs/cursor.html#cursor.executemany
@profile
def insert_executemany(connection, beers: Iterator[Dict[str, Any]]) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
all_beers = [{
**beer,
'first_brewed': parse_first_brewed(beer['first_brewed']),
'volume': beer['volume']['value'],
} for beer in beers]
cursor.executemany("""
INSERT INTO staging_beers VALUES (
%(id)s,
%(name)s,
%(tagline)s,
%(first_brewed)s,
%(description)s,
%(image_url)s,
%(abv)s,
%(ibu)s,
%(target_fg)s,
%(target_og)s,
%(ebc)s,
%(srm)s,
%(ph)s,
%(attenuation_level)s,
%(brewers_tips)s,
%(contributed_by)s,
%(volume)s
);
""", all_beers)
@profile
def insert_executemany_iterator(connection, beers: Iterator[Dict[str, Any]]) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
cursor.executemany("""
INSERT INTO staging_beers VALUES (
%(id)s,
%(name)s,
%(tagline)s,
%(first_brewed)s,
%(description)s,
%(image_url)s,
%(abv)s,
%(ibu)s,
%(target_fg)s,
%(target_og)s,
%(ebc)s,
%(srm)s,
%(ph)s,
%(attenuation_level)s,
%(brewers_tips)s,
%(contributed_by)s,
%(volume)s
);
""", ({
**beer,
'first_brewed': parse_first_brewed(beer['first_brewed']),
'volume': beer['volume']['value'],
} for beer in beers))
# http://initd.org/psycopg/docs/extras.html#psycopg2.extras.execute_batch
import psycopg2.extras
@profile
def insert_execute_batch(connection, beers: Iterator[Dict[str, Any]], page_size: int = 100) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
all_beers = [{
**beer,
'first_brewed': parse_first_brewed(beer['first_brewed']),
'volume': beer['volume']['value'],
} for beer in beers]
psycopg2.extras.execute_batch(cursor, """
INSERT INTO staging_beers VALUES (
%(id)s,
%(name)s,
%(tagline)s,
%(first_brewed)s,
%(description)s,
%(image_url)s,
%(abv)s,
%(ibu)s,
%(target_fg)s,
%(target_og)s,
%(ebc)s,
%(srm)s,
%(ph)s,
%(attenuation_level)s,
%(brewers_tips)s,
%(contributed_by)s,
%(volume)s
);
""", all_beers, page_size=page_size)
@profile
def insert_execute_batch_iterator(connection, beers: Iterator[Dict[str, Any]], page_size: int = 100) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
iter_beers = ({
**beer,
'first_brewed': parse_first_brewed(beer['first_brewed']),
'volume': beer['volume']['value'],
} for beer in beers)
psycopg2.extras.execute_batch(cursor, """
INSERT INTO staging_beers VALUES (
%(id)s,
%(name)s,
%(tagline)s,
%(first_brewed)s,
%(description)s,
%(image_url)s,
%(abv)s,
%(ibu)s,
%(target_fg)s,
%(target_og)s,
%(ebc)s,
%(srm)s,
%(ph)s,
%(attenuation_level)s,
%(brewers_tips)s,
%(contributed_by)s,
%(volume)s
);
""", iter_beers, page_size=page_size)
# http://initd.org/psycopg/docs/extras.html#psycopg2.extras.execute_values
import psycopg2.extras
@profile
def insert_execute_values(connection, beers: Iterator[Dict[str, Any]]) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
psycopg2.extras.execute_values(cursor, """
INSERT INTO staging_beers VALUES %s;
""", [(
beer['id'],
beer['name'],
beer['tagline'],
parse_first_brewed(beer['first_brewed']),
beer['description'],
beer['image_url'],
beer['abv'],
beer['ibu'],
beer['target_fg'],
beer['target_og'],
beer['ebc'],
beer['srm'],
beer['ph'],
beer['attenuation_level'],
beer['brewers_tips'],
beer['contributed_by'],
beer['volume']['value'],
) for beer in beers])
@profile
def insert_execute_values_iterator(connection, beers: Iterator[Dict[str, Any]], page_size: int = 100) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
psycopg2.extras.execute_values(cursor, """
INSERT INTO staging_beers VALUES %s;
""", ((
beer['id'],
beer['name'],
beer['tagline'],
parse_first_brewed(beer['first_brewed']),
beer['description'],
beer['image_url'],
beer['abv'],
beer['ibu'],
beer['target_fg'],
beer['target_og'],
beer['ebc'],
beer['srm'],
beer['ph'],
beer['attenuation_level'],
beer['brewers_tips'],
beer['contributed_by'],
beer['volume']['value'],
) for beer in beers), page_size=page_size)
# http://initd.org/psycopg/docs/cursor.html#cursor.copy_from
# https://docs.python.org/3.7/library/io.html?io.StringIO#io.StringIO
import io
def clean_csv_value(value: Optional[Any]) -> str:
if value is None:
return r'\N'
return str(value).replace('\n', '\\n')
@profile
def copy_stringio(connection, beers: Iterator[Dict[str, Any]]) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
csv_file_like_object = io.StringIO()
for beer in beers:
csv_file_like_object.write('|'.join(map(clean_csv_value, (
beer['id'],
beer['name'],
beer['tagline'],
parse_first_brewed(beer['first_brewed']),
beer['description'],
beer['image_url'],
beer['abv'],
beer['ibu'],
beer['target_fg'],
beer['target_og'],
beer['ebc'],
beer['srm'],
beer['ph'],
beer['attenuation_level'],
beer['contributed_by'],
beer['brewers_tips'],
beer['volume']['value'],
))) + '\n')
csv_file_like_object.seek(0)
cursor.copy_from(csv_file_like_object, 'staging_beers', sep='|')
class StringIteratorIO(io.TextIOBase):
def __init__(self, iter: Iterator[str]):
self._iter = iter
self._buff = ''
def readable(self) -> bool:
return True
def _read1(self, n: Optional[int] = None) -> str:
while not self._buff:
try:
self._buff = next(self._iter)
except StopIteration:
break
ret = self._buff[:n]
self._buff = self._buff[len(ret):]
return ret
def read(self, n: Optional[int] = None) -> str:
line = []
if n is None or n < 0:
while True:
m = self._read1()
if not m:
break
line.append(m)
else:
while n > 0:
m = self._read1(n)
if not m:
break
n -= len(m)
line.append(m)
return ''.join(line)
@profile
def copy_string_iterator(connection, beers: Iterator[Dict[str, Any]], size: int = 8192) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
beers_string_iterator = StringIteratorIO((
'|'.join(map(clean_csv_value, (
beer['id'],
beer['name'],
beer['tagline'],
parse_first_brewed(beer['first_brewed']).isoformat(),
beer['description'],
beer['image_url'],
beer['abv'],
beer['ibu'],
beer['target_fg'],
beer['target_og'],
beer['ebc'],
beer['srm'],
beer['ph'],
beer['attenuation_level'],
beer['brewers_tips'],
beer['contributed_by'],
beer['volume']['value'],
))) + '\n'
for beer in beers
))
cursor.copy_from(beers_string_iterator, 'staging_beers', sep='|', size=size)
#------------------------ Benchmark
connection = psycopg2.connect(
host='localhost',
database='testload',
user='haki',
password=None,
)
connection.set_session(autocommit=True)
from psycopg2.extras import NamedTupleCursor
def test(connection, n: int):
# Make sure the data was loaded
with connection.cursor(cursor_factory=psycopg2.extras.NamedTupleCursor) as cursor:
# Test number of rows.
cursor.execute('SELECT COUNT(*) AS cnt FROM staging_beers')
record = cursor.fetchone()
assert record.cnt == n, f'Expected {n} rows, got {rowcount} rows!'
# Test that the data was loaded, and that transformations were applied correctly.
cursor.execute("""
SELECT DISTINCT ON (id)
*
FROM
staging_beers
WHERE
id IN (1, 235)
ORDER BY
id;
""")
beer_1 = cursor.fetchone()
assert beer_1.name == 'Buzz'
assert beer_1.first_brewed == datetime.date(2007, 9, 1)
assert beer_1.volume == 20
beer_235 = cursor.fetchone()
assert beer_235.name == 'Mango And Chili Barley Wine'
assert beer_235.first_brewed == datetime.date(2016, 1, 1)
assert beer_235.volume == 20
beers = list(iter_beers_from_api()) * 100
insert_one_by_one(connection, beers)
test(connection, len(beers))
insert_executemany(connection, beers)
test(connection, len(beers))
insert_executemany_iterator(connection, beers)
test(connection, len(beers))
insert_execute_batch(connection, beers)
test(connection, len(beers))
insert_execute_batch_iterator(connection, beers, page_size=1)
test(connection, len(beers))
insert_execute_batch_iterator(connection, beers, page_size=100)
test(connection, len(beers))
insert_execute_batch_iterator(connection, beers, page_size=1000)
test(connection, len(beers))
insert_execute_batch_iterator(connection, beers, page_size=10000)
test(connection, len(beers))
insert_execute_values(connection, beers)
test(connection, len(beers))
insert_execute_values_iterator(connection, beers, page_size=1)
test(connection, len(beers))
insert_execute_values_iterator(connection, beers, page_size=100)
test(connection, len(beers))
insert_execute_values_iterator(connection, beers, page_size=1000)
test(connection, len(beers))
insert_execute_values_iterator(connection, beers, page_size=10000)
test(connection, len(beers))
copy_stringio(connection, beers)
test(connection, len(beers))
copy_string_iterator(connection, beers, size=1024)
test(connection, len(beers))
copy_string_iterator(connection, beers, size=1024 * 8)
test(connection, len(beers))
copy_string_iterator(connection, beers, size=1024 * 16)
test(connection, len(beers))
copy_string_iterator(connection, beers, size=1024 * 64)
test(connection, len(beers))
@jgentil

This comment has been minimized.

Copy link

jgentil commented Jul 10, 2019

As a minor nitpick, iter_beers_from_api has an antipattern for it's counter. Consider using for page in itertools.count(1): instead of using the while True / page+=1 idiom.

I really enjoyed this exercise for how to optimize data loading, as it's a very good example of how to optimize a lot of code in Python in general. Often times iterators and generators are glossed over for optimizing memory because it seems to "work just fine" without it, until suddenly it doesn't. :)

@hakib

This comment has been minimized.

Copy link
Owner Author

hakib commented Jul 10, 2019

Oh, nice idea. I wouldn't call while True an anti pattern but itertools.count(1) is a neat idea.

@villoro

This comment has been minimized.

Copy link

villoro commented Jul 11, 2019

I really like your article. I belive that you could improve your results by using pandas for reading the data and transforming it to a cursor since it is using c under the hood.

I did something that I think is very similar:

from io import StringIO

output = StringIO()
df.to_csv(output, sep=';', header=False, index=False, columns=df.columns)
output.getvalue()
# jump to start of stream
output.seek(0)

# Insert df into postgre
connection = engine.raw_connection()
with connection.cursor() as cursor:
    cursor.copy_from(output, "my_table", sep=';', null="NULL", columns=(df.columns))
    connection.commit()

You can read more about that here

@hakib

This comment has been minimized.

Copy link
Owner Author

hakib commented Jul 11, 2019

Hey villoro,
The point of the article is to not only do it fast, but do it with very little memory. From the looks of it, it seems like your suggested method will load the entire file into memory. If that's the case, you wont be able to use this for very large files. Also, I would love to see the benchmark results compared to the other methods listed above.

Haki.

@villoro

This comment has been minimized.

Copy link

villoro commented Jul 12, 2019

Yes that is true. With my computer (16 GB of RAM) I start having problems with csv of around 10 GB. For reading really large files I would suggest using dask or pyspark. Both should be significally faster than python code but I have not really tested their memory usage.

And you could always use pandas and read the csv by chunks to avoid memory limits with something like:

chunksize = 10 ** 6 # or any number you want
for chunk in pd.read_csv(filename, chunksize=chunksize):
    process(chunk)
@jgentil

This comment has been minimized.

Copy link

jgentil commented Jul 26, 2019

Oh, nice idea. I wouldn't call while True an anti pattern but itertools.count(1) is a neat idea.

Sorry, I didn't mean to call while True itself an anti-pattern, but specifically "while True with a counter"

For the other discussion, I agree with @hakib regarding the minimization of memory usage. The code @villoro wrote to buffer it into a StringIO seems to be counterproductive to the idea of this post. I love the idea of using almost 0 memory to load large datasets into Postgres.

Thanks for the really excellent writeup on this.

@hakib

This comment has been minimized.

Copy link
Owner Author

hakib commented Jul 28, 2019

@jgentil itertools.count(1) is actually a pretty great idea and I'm going to use in my own projects. Thanks :)

@franz101

This comment has been minimized.

Copy link

franz101 commented Sep 7, 2019

Pure GOLD. What about ingesting JSON-Data?

@hakib

This comment has been minimized.

Copy link
Owner Author

hakib commented Sep 8, 2019

Hey @franz101,
Thanks ;)

The benchmark is ingesting JSON data. It does that but converting it to a CSV on the fly, and importing into the database using PostygreSQL's COPY command.

The article explains it.

@jaumevalls

This comment has been minimized.

Copy link

jaumevalls commented Oct 10, 2019

Hello,

First of all thanks for the tutorial, looks very interesting.

I would like to fetch data from mongo paginated and insert them to Postgres sql.

When I try the code below nothing happen.

Does anyone knows what Im doing wrong?

Thanks in advance

@hakib

This comment has been minimized.

Copy link
Owner Author

hakib commented Oct 11, 2019

Hey @jaumevalls, it's hard to understand from your comment what's not working. However, you can start by making sure print(list(iter_beers_from_api())) produces valid "CSV" text.

@jaumevalls

This comment has been minimized.

Copy link

jaumevalls commented Oct 11, 2019

Nothing, there is nothing printed on the console. Attached you could find the code. Maybe you could give me a hand.

https://gist.github.com/jaumevalls/90281e5476cb1101acee5de67edddb77

Thanks

@hakib

This comment has been minimized.

Copy link
Owner Author

hakib commented Oct 11, 2019

I'm not familiar with mongodb client, but are you sure that data is a generator?

    yield from data  

There is a difference between yield and yield from.

@jaumevalls

This comment has been minimized.

Copy link

jaumevalls commented Oct 11, 2019

No, data is a cursor I think here is where I have the problem. Im new to python, previously I've used a ETL to parse data from mongo to Postgres but the performance is very low.

I assume I need to change the way I fetch the data.

@jaumevalls

This comment has been minimized.

Copy link

jaumevalls commented Oct 12, 2019

I solved the issue, thank you very much for your help much appreciated

@hakib

This comment has been minimized.

Copy link
Owner Author

hakib commented Oct 13, 2019

That's great @jaumevalls, how is the performance and memory usage of this compared to your existing ETL process?

@jaumevalls

This comment has been minimized.

Copy link

jaumevalls commented Oct 14, 2019

Hey! I did some test this weekend and importing 800.000 rows take 3 min more or less. I'm not able to use the profile to catch all memory load.

My main function looks like this

mess_chunk_iter = iterate_by_chunks(col, 1000, 0, query={'tim':{'$gt': '1571041103'}}, projection={}) for docs in mess_chunk_iter: copy_string_iterator(connection_string, docs, size=1024)

Where I should put the @Profile in order to measure?

@hakib

This comment has been minimized.

Copy link
Owner Author

hakib commented Oct 14, 2019

You might want to pass the generator directly to copy_string_iterator(connection_string, mess_chunk_iter).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.