Skip to content

Instantly share code, notes, and snippets.

@jaumevalls
Created October 11, 2019 10:13
Show Gist options
  • Save jaumevalls/90281e5476cb1101acee5de67edddb77 to your computer and use it in GitHub Desktop.
Save jaumevalls/90281e5476cb1101acee5de67edddb77 to your computer and use it in GitHub Desktop.
from typing import Iterator, Dict, Any
from pymongo import MongoClient
from typing import Iterator, Optional
import io
import psycopg2
import psycopg2.extras
import time
from functools import wraps
from memory_profiler import memory_usage
connection_string = "dbname='acc_rep' user='postgres' host='localhost' password='postgres'"
def profile(fn):
@wraps(fn)
def inner(*args, **kwargs):
fn_kwargs_str = ', '.join(f'{k}={v}' for k, v in kwargs.items())
print(f'\n{fn.__name__}({fn_kwargs_str})')
# Measure time
t = time.perf_counter()
retval = fn(*args, **kwargs)
elapsed = time.perf_counter() - t
print(f'Time {elapsed:0.4}')
# Measure memory
mem, retval = memory_usage((fn, args, kwargs), retval=True, timeout=200, interval=1e-7)
print(f'Memory {max(mem) - min(mem)}')
return retval
return inner
class StringIteratorIO(io.TextIOBase):
def __init__(self, iter: Iterator[str]):
self._iter = iter
self._buff = ''
def readable(self) -> bool:
return True
def _read1(self, n: Optional[int] = None) -> str:
while not self._buff:
try:
self._buff = next(self._iter)
except StopIteration:
break
ret = self._buff[:n]
self._buff = self._buff[len(ret):]
return ret
def read(self, n: Optional[int] = None) -> str:
line = []
if n is None or n < 0:
while True:
m = self._read1()
if not m:
break
line.append(m)
else:
while n > 0:
m = self._read1(n)
if not m:
break
n -= len(m)
line.append(m)
return ''.join(line)
def clean_csv_value(value: Optional[Any]) -> str:
if value is None:
return r'\N'
return str(value).replace('\n', '\\n')
def iter_beers_from_api(page_size: int = 1000) -> Iterator[Dict[str, Any]]:
client = MongoClient("mongodb://localhost:27017")
db = client["trackingdb"]
col = db["clk"]
page = 0
while True:
data = col.aggregate([
{
'$match': {"tim": {"$gt": '1570579200',"$lte": '1570620665'}}
},
{
'$skip':page
},
{
'$limit':page_size
}
])
if not data:
break
yield from data
page += 1
client.close()
def insert_one_by_one(connection_string, beers: Iterator[Dict[str, Any]]) -> None:
try:
conn = psycopg2.connect(connection_string)
conn.autocommit = True
with conn.cursor() as cursor:
cursor.execute("Truncate {} Cascade;".format("clk_day.clk_day_1_master"))
for beer in beers:
cursor.execute("INSERT INTO clk_day.clk_day_1_master (_id,org,act,pub,gel,url,adg,nid,ei1,ei2,ei3,ei4,sid,sid2,ppa,coo,dom,fid,tim,pca,uui,fin,szc,sfo,pde,uac,uaa,uag,uip,ref,eme,val,ifa,gai,enc,dev,unq) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)", (str(beer['_id']),beer['org'],beer['act'],beer['pub'],beer['gel'],beer['url'],beer['adg'],beer['nid'],beer['ei1'],beer['ei2'],beer['ei3'],beer['ei4'],beer['sid'],beer['sid2'],beer['ppa'],beer['coo'],beer['dom'],beer['fid'],beer['tim'],beer['pca'],beer['uui'],beer['fin'],beer['szc'],beer['sfo'],beer['pde'],beer['uac'],beer['uaa'],beer['uag'],beer['uip'],beer['ref'],beer['eme'],beer['val'],beer['ifa'],beer['gai'],beer['enc'],beer['dev'],beer['unq']))
except Exception as e:
print('Error {}'.format(str(e)))
@profile
def copy_string_iterator(connection_string, beers: Iterator[Dict[str, Any]], size: int = 8192) -> None:
try:
conn = psycopg2.connect(connection_string)
conn.autocommit = True
with conn.cursor() as cursor:
cursor.execute("Truncate {} Cascade;".format("clk_day.clk_day_1_master"))
beers_string_iterator = StringIteratorIO((
'|'.join(map(clean_csv_value,(
str(beer['_id']),
beer['org'],
beer['act'],
beer['pub'],
beer['gel'],
beer['url'],
beer['adg'],
beer['nid'],
beer['ei1'],
beer['ei2'],
beer['ei3'],
beer['ei4'],
beer['ppa'],
beer['coo'],
beer['pca'],
beer['tim'],
beer['uui'],
beer['fin'],
beer['uac'],
beer['szc'],
beer['sfo'],
beer['pde'],
beer['uaa'],
beer['uag'],
beer['uip'],
beer['ref'],
beer['eme'],
beer['val'],
beer['ifa'],
beer['gai'],
beer['enc'],
0,
0,
0,
0,
0,
'',
'0',
beer['unq'],
beer['dev'],
beer['dom'],
beer['fid'],
beer['sid'],
beer['sid2'],
))) + '\n'
for beer in beers
))
cursor.copy_expert(beers_string_iterator, 'clk_day.clk_day_1_master', sep='|', size=size)
except Exception as e:
print('Error Exception {}'.format(str(e)))
if __name__ == "__main__":
#beers = iter_beers_from_api()
print(list(iter_beers_from_api()))
#copy_string_iterator(connection_string, iter(beers), size=1024)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment