Skip to content

Instantly share code, notes, and snippets.

@ThinkCode
Forked from hakib/beers.py
Created January 28, 2021 07:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ThinkCode/79cdeadaa0703c36cd2338c2288a02b8 to your computer and use it in GitHub Desktop.
Save ThinkCode/79cdeadaa0703c36cd2338c2288a02b8 to your computer and use it in GitHub Desktop.
# https://hakibenita.com/fast-load-data-python-postgresql
from typing import Iterator, Dict, Any, Optional
from urllib.parse import urlencode
import datetime
#------------------------ Profile
import time
from functools import wraps
from memory_profiler import memory_usage
def profile(fn):
@wraps(fn)
def inner(*args, **kwargs):
fn_kwargs_str = ', '.join(f'{k}={v}' for k, v in kwargs.items())
print(f'\n{fn.__name__}({fn_kwargs_str})')
# Measure time
t = time.perf_counter()
retval = fn(*args, **kwargs)
elapsed = time.perf_counter() - t
print(f'Time {elapsed:0.4}')
# Measure memory
mem, retval = memory_usage((fn, args, kwargs), retval=True, timeout=200, interval=1e-7)
print(f'Memory {max(mem) - min(mem)}')
return retval
return inner
#------------------------ Data
import requests
def iter_beers_from_api(page_size: int = 25) -> Iterator[Dict[str, Any]]:
session = requests.Session()
page = 1
while True:
response = session.get('https://api.punkapi.com/v2/beers?' + urlencode({
'page': page,
'per_page': page_size
}))
response.raise_for_status()
data = response.json()
if not data:
break
for beer in data:
yield beer
page += 1
def iter_beers_from_file(path: str) -> Iterator[Dict[str, Any]]:
import json
with open(path, 'r') as f:
data = json.load(f)
for beer in data:
yield beer
#------------------------ Load
def create_staging_table(cursor):
cursor.execute("""
DROP TABLE IF EXISTS staging_beers;
CREATE TABLE staging_beers (
id INTEGER,
name TEXT,
tagline TEXT,
first_brewed DATE,
description TEXT,
image_url TEXT,
abv DECIMAL,
ibu DECIMAL,
target_fg DECIMAL,
target_og DECIMAL,
ebc DECIMAL,
srm DECIMAL,
ph DECIMAL,
attenuation_level DECIMAL,
brewers_tips TEXT,
contributed_by TEXT,
volume INTEGER
);
""")
def parse_first_brewed(text: str) -> datetime.date:
parts = text.split('/')
if len(parts) == 2:
return datetime.date(int(parts[1]), int(parts[0]), 1)
elif len(parts) == 1:
return datetime.date(int(parts[0]), 1, 1)
else:
assert False, 'Unknown date format'
@profile
def insert_one_by_one(connection, beers: Iterator[Dict[str, Any]]) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
for beer in beers:
cursor.execute("""
INSERT INTO staging_beers VALUES (
%(id)s,
%(name)s,
%(tagline)s,
%(first_brewed)s,
%(description)s,
%(image_url)s,
%(abv)s,
%(ibu)s,
%(target_fg)s,
%(target_og)s,
%(ebc)s,
%(srm)s,
%(ph)s,
%(attenuation_level)s,
%(brewers_tips)s,
%(contributed_by)s,
%(volume)s
);
""", {
**beer,
'first_brewed': parse_first_brewed(beer['first_brewed']),
'volume': beer['volume']['value'],
})
# http://initd.org/psycopg/docs/cursor.html#cursor.executemany
@profile
def insert_executemany(connection, beers: Iterator[Dict[str, Any]]) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
all_beers = [{
**beer,
'first_brewed': parse_first_brewed(beer['first_brewed']),
'volume': beer['volume']['value'],
} for beer in beers]
cursor.executemany("""
INSERT INTO staging_beers VALUES (
%(id)s,
%(name)s,
%(tagline)s,
%(first_brewed)s,
%(description)s,
%(image_url)s,
%(abv)s,
%(ibu)s,
%(target_fg)s,
%(target_og)s,
%(ebc)s,
%(srm)s,
%(ph)s,
%(attenuation_level)s,
%(brewers_tips)s,
%(contributed_by)s,
%(volume)s
);
""", all_beers)
@profile
def insert_executemany_iterator(connection, beers: Iterator[Dict[str, Any]]) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
cursor.executemany("""
INSERT INTO staging_beers VALUES (
%(id)s,
%(name)s,
%(tagline)s,
%(first_brewed)s,
%(description)s,
%(image_url)s,
%(abv)s,
%(ibu)s,
%(target_fg)s,
%(target_og)s,
%(ebc)s,
%(srm)s,
%(ph)s,
%(attenuation_level)s,
%(brewers_tips)s,
%(contributed_by)s,
%(volume)s
);
""", ({
**beer,
'first_brewed': parse_first_brewed(beer['first_brewed']),
'volume': beer['volume']['value'],
} for beer in beers))
# http://initd.org/psycopg/docs/extras.html#psycopg2.extras.execute_batch
import psycopg2.extras
@profile
def insert_execute_batch(connection, beers: Iterator[Dict[str, Any]], page_size: int = 100) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
all_beers = [{
**beer,
'first_brewed': parse_first_brewed(beer['first_brewed']),
'volume': beer['volume']['value'],
} for beer in beers]
psycopg2.extras.execute_batch(cursor, """
INSERT INTO staging_beers VALUES (
%(id)s,
%(name)s,
%(tagline)s,
%(first_brewed)s,
%(description)s,
%(image_url)s,
%(abv)s,
%(ibu)s,
%(target_fg)s,
%(target_og)s,
%(ebc)s,
%(srm)s,
%(ph)s,
%(attenuation_level)s,
%(brewers_tips)s,
%(contributed_by)s,
%(volume)s
);
""", all_beers, page_size=page_size)
@profile
def insert_execute_batch_iterator(connection, beers: Iterator[Dict[str, Any]], page_size: int = 100) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
iter_beers = ({
**beer,
'first_brewed': parse_first_brewed(beer['first_brewed']),
'volume': beer['volume']['value'],
} for beer in beers)
psycopg2.extras.execute_batch(cursor, """
INSERT INTO staging_beers VALUES (
%(id)s,
%(name)s,
%(tagline)s,
%(first_brewed)s,
%(description)s,
%(image_url)s,
%(abv)s,
%(ibu)s,
%(target_fg)s,
%(target_og)s,
%(ebc)s,
%(srm)s,
%(ph)s,
%(attenuation_level)s,
%(brewers_tips)s,
%(contributed_by)s,
%(volume)s
);
""", iter_beers, page_size=page_size)
# http://initd.org/psycopg/docs/extras.html#psycopg2.extras.execute_values
import psycopg2.extras
@profile
def insert_execute_values(connection, beers: Iterator[Dict[str, Any]]) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
psycopg2.extras.execute_values(cursor, """
INSERT INTO staging_beers VALUES %s;
""", [(
beer['id'],
beer['name'],
beer['tagline'],
parse_first_brewed(beer['first_brewed']),
beer['description'],
beer['image_url'],
beer['abv'],
beer['ibu'],
beer['target_fg'],
beer['target_og'],
beer['ebc'],
beer['srm'],
beer['ph'],
beer['attenuation_level'],
beer['brewers_tips'],
beer['contributed_by'],
beer['volume']['value'],
) for beer in beers])
@profile
def insert_execute_values_iterator(connection, beers: Iterator[Dict[str, Any]], page_size: int = 100) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
psycopg2.extras.execute_values(cursor, """
INSERT INTO staging_beers VALUES %s;
""", ((
beer['id'],
beer['name'],
beer['tagline'],
parse_first_brewed(beer['first_brewed']),
beer['description'],
beer['image_url'],
beer['abv'],
beer['ibu'],
beer['target_fg'],
beer['target_og'],
beer['ebc'],
beer['srm'],
beer['ph'],
beer['attenuation_level'],
beer['brewers_tips'],
beer['contributed_by'],
beer['volume']['value'],
) for beer in beers), page_size=page_size)
# http://initd.org/psycopg/docs/cursor.html#cursor.copy_from
# https://docs.python.org/3.7/library/io.html?io.StringIO#io.StringIO
import io
def clean_csv_value(value: Optional[Any]) -> str:
if value is None:
return r'\N'
return str(value).replace('\n', '\\n')
@profile
def copy_stringio(connection, beers: Iterator[Dict[str, Any]]) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
csv_file_like_object = io.StringIO()
for beer in beers:
csv_file_like_object.write('|'.join(map(clean_csv_value, (
beer['id'],
beer['name'],
beer['tagline'],
parse_first_brewed(beer['first_brewed']),
beer['description'],
beer['image_url'],
beer['abv'],
beer['ibu'],
beer['target_fg'],
beer['target_og'],
beer['ebc'],
beer['srm'],
beer['ph'],
beer['attenuation_level'],
beer['contributed_by'],
beer['brewers_tips'],
beer['volume']['value'],
))) + '\n')
csv_file_like_object.seek(0)
cursor.copy_from(csv_file_like_object, 'staging_beers', sep='|')
class StringIteratorIO(io.TextIOBase):
def __init__(self, iter: Iterator[str]):
self._iter = iter
self._buff = ''
def readable(self) -> bool:
return True
def _read1(self, n: Optional[int] = None) -> str:
while not self._buff:
try:
self._buff = next(self._iter)
except StopIteration:
break
ret = self._buff[:n]
self._buff = self._buff[len(ret):]
return ret
def read(self, n: Optional[int] = None) -> str:
line = []
if n is None or n < 0:
while True:
m = self._read1()
if not m:
break
line.append(m)
else:
while n > 0:
m = self._read1(n)
if not m:
break
n -= len(m)
line.append(m)
return ''.join(line)
@profile
def copy_string_iterator(connection, beers: Iterator[Dict[str, Any]], size: int = 8192) -> None:
with connection.cursor() as cursor:
create_staging_table(cursor)
beers_string_iterator = StringIteratorIO((
'|'.join(map(clean_csv_value, (
beer['id'],
beer['name'],
beer['tagline'],
parse_first_brewed(beer['first_brewed']).isoformat(),
beer['description'],
beer['image_url'],
beer['abv'],
beer['ibu'],
beer['target_fg'],
beer['target_og'],
beer['ebc'],
beer['srm'],
beer['ph'],
beer['attenuation_level'],
beer['brewers_tips'],
beer['contributed_by'],
beer['volume']['value'],
))) + '\n'
for beer in beers
))
cursor.copy_from(beers_string_iterator, 'staging_beers', sep='|', size=size)
#------------------------ Benchmark
connection = psycopg2.connect(
host='localhost',
database='testload',
user='haki',
password=None,
)
connection.set_session(autocommit=True)
from psycopg2.extras import NamedTupleCursor
def test(connection, n: int):
# Make sure the data was loaded
with connection.cursor(cursor_factory=psycopg2.extras.NamedTupleCursor) as cursor:
# Test number of rows.
cursor.execute('SELECT COUNT(*) AS cnt FROM staging_beers')
record = cursor.fetchone()
assert record.cnt == n, f'Expected {n} rows, got {rowcount} rows!'
# Test that the data was loaded, and that transformations were applied correctly.
cursor.execute("""
SELECT DISTINCT ON (id)
*
FROM
staging_beers
WHERE
id IN (1, 235)
ORDER BY
id;
""")
beer_1 = cursor.fetchone()
assert beer_1.name == 'Buzz'
assert beer_1.first_brewed == datetime.date(2007, 9, 1)
assert beer_1.volume == 20
beer_235 = cursor.fetchone()
assert beer_235.name == 'Mango And Chili Barley Wine'
assert beer_235.first_brewed == datetime.date(2016, 1, 1)
assert beer_235.volume == 20
beers = list(iter_beers_from_api()) * 100
insert_one_by_one(connection, beers)
test(connection, len(beers))
insert_executemany(connection, beers)
test(connection, len(beers))
insert_executemany_iterator(connection, beers)
test(connection, len(beers))
insert_execute_batch(connection, beers)
test(connection, len(beers))
insert_execute_batch_iterator(connection, beers, page_size=1)
test(connection, len(beers))
insert_execute_batch_iterator(connection, beers, page_size=100)
test(connection, len(beers))
insert_execute_batch_iterator(connection, beers, page_size=1000)
test(connection, len(beers))
insert_execute_batch_iterator(connection, beers, page_size=10000)
test(connection, len(beers))
insert_execute_values(connection, beers)
test(connection, len(beers))
insert_execute_values_iterator(connection, beers, page_size=1)
test(connection, len(beers))
insert_execute_values_iterator(connection, beers, page_size=100)
test(connection, len(beers))
insert_execute_values_iterator(connection, beers, page_size=1000)
test(connection, len(beers))
insert_execute_values_iterator(connection, beers, page_size=10000)
test(connection, len(beers))
copy_stringio(connection, beers)
test(connection, len(beers))
copy_string_iterator(connection, beers, size=1024)
test(connection, len(beers))
copy_string_iterator(connection, beers, size=1024 * 8)
test(connection, len(beers))
copy_string_iterator(connection, beers, size=1024 * 16)
test(connection, len(beers))
copy_string_iterator(connection, beers, size=1024 * 64)
test(connection, len(beers))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment