Skip to content

Instantly share code, notes, and snippets.

@blaggacao
Created October 9, 2018 15:06
Show Gist options
  • Save blaggacao/ba83a7e500c202cf842010cfddb25e15 to your computer and use it in GitHub Desktop.
Save blaggacao/ba83a7e500c202cf842010cfddb25e15 to your computer and use it in GitHub Desktop.
Parallel loading
"""Load data efficiently into Odoo"""
import threading
import xmlrpclib
import hashlib
import sys
import json
import datetime
import pandas as pd, numpy as np
from time import time, sleep
from ast import literal_eval
from functools import partial
from rpc_thread import RpcThread
from utils import log
LOAD_PARAMS = [
"group_by",
"odoo_context",
"chunk_size",
"worker",
"call"
]
def get_load_config(config):
return {
'group_by' : config.get(LOAD_PARAMS[0]),
'context' : config.get(LOAD_PARAMS[1]) or {},
'chunk_size': int(config.get(LOAD_PARAMS[2]) or 15),
'worker' : int(config.get(LOAD_PARAMS[3]) or 3 ),
'call' : config.get(LOAD_PARAMS[4]) or [],
}
def convert(item):
"""Prep vals for loading"""
try: return item.item() # Flatten from dtypes
except:
if item != item: return '' # Item is a nan
else: return item
class DummyLock(object):
@staticmethod
def acquire(blocking=None):
return True
@staticmethod
def release():
return True
class BaseLoader(object):
"""
Loader for Odoo Model Data, supporting
- Parallel loading via multiple connections (threads/workers)
- Batching by chunk_size to avoid timeouts
- Group by lock to avoid concurrent updates
- Implicit toposort for parent column dependency resolution
"""
def __repr__(self):
return '<%s object for %r at %s>' % (
self.__class__.__name__,
self.name,
hex(id(self))
)
def __init__(self, Model, config={}):
self.model = Model
self.conn = Model.rModel
self.config = config and get_load_config(config) or config
self.config.update({
'parent_field': self.model._parent_name,
'groupby_flds': self.model.get_group_by_candidates()
})
# Keep track of locks globally
self.gby_lock_map = {}
self.dep_lock_map = {}
self.grp_lock_map = {}
# Retain result ids for call methods
self.result_ids = []
def _load_schema(self, schema):
self.headers = [col['name'] for col in schema['columns']]
def _get_indices(self, headers):
headers = [h.split('/')[0] for h in headers] # Normalize field name to Odoo
groupby_flds = [f in headers for f in self.config['groupby_flds']]
self.indices = {}
for item in ['group_by', 'parent_field']:
try:
self.indices[item] = headers.index(self.config[item])
except:
log("Column {!r} not found".format(item))
self.indices[item] = None
self.indices.update({
'id_column' : self.indices['parent_field'] and headers.index(self.model.id_col) or None,
'group_by' : self.indices['group_by'] or (len(groupby_flds) and groupby_flds[0]) or None # TODO implement multiple grouping cols
})
def group_data(self, df):
"""
Groups & Slices Data to enforce chunck size, parent key toposort and mutually exclusive (groupby) values.
..param df: pandas.DataFrame
..return: pandas.groupby Object
"""
chunk = self.config['chunk_size']
return (
df
.assign(
composite_lock=self._assign_locks
)
.groupby(
by=[
(np.arange(len(df))//chunk)+1, # This is the best magic I've ever seen so far!!! (https://stackoverflow.com/a/25703030)
'composite_lock',
],
sort=False # Save resources
)
)
def _assign_locks(self, df):
return zip(
df.apply(self._make_group_lock_key, axis=1),
df.apply(self._make_parent_lock_key, axis=1)
) # (Unlocked, Locked) by default
def _make_group_lock_key(self, row):
"""Return a unblocked groupby lock per row."""
g_ix = self.indices['group_by']
g_key = g_ix and convert(row[g_ix]) or None
g_lck = self.gby_lock_map.setdefault(
g_key,
g_key and threading.Lock() or DummyLock()
)
return g_lck
def _make_parent_lock_key(self, row):
"""Return a blocked parent-child dependency lock per row."""
p_ix = self.indices['parent_field']
p_key = p_ix and convert(row[p_ix]) or None
p_lck = self.dep_lock_map.setdefault(
p_key,
p_key and threading.Lock() or DummyLock()
)
# Block parent_dependencies by default.
# Will be unblocked after that parent is loaded.
p_lck.acquire(0)
return p_lck
def _make_lock_key(self, row):
g_ix, d_ix = self.indices['group_by'], self.indices['parent_field']
g_key = g_ix and convert(row[g_ix]) or None
d_key = d_ix and convert(row[d_ix]) or None
g_lck = self.gby_lock_map.setdefault(g_key, g_key and threading.Lock() or DummyLock())
d_lck = self.dep_lock_map.setdefault(d_key, d_key and threading.Lock() or DummyLock())
d_lck.acquire(0) # Else: (Re-)Load the gun
return (g_lck, d_lck) # (Unlocked, Locked) by default
def key_log_context(self, key):
return "Grp: {exl:>5} / Hie: {rec:>5} / Lot: {lot:>5} / Thrd: {thrd!r:>12}".format(
exl=key[1][0] and hashlib.md5(str(key[1][0])).hexdigest()[:5],
rec=key[1][1] and hashlib.md5(str(key[1][1])).hexdigest()[:5],
lot=key[0], thrd=threading.currentThread().name)
def boost(self, df=pd.DataFrame, log_writer=None, prev_load_log=None):
"""
Threaded loading of data at the speed of light.
..param: pandas.DataFrame
"""
if bool(df.empty):
if not bool(self.model.df.empty):
df = self.model.df
else:
raise Excpetion("We need a dataframe. Either explicitly passed or loaded on the Model object.")
# Filter out previously loaded rows (based on a json file log)
if prev_load_log:
ids_not_to_load = [d[self.model.id_col] for d in prev_load_log if d['loaded'] is not False]
df = df[~df[self.model.id_col].isin(ids_not_to_load)]
if not df.shape[0]:
print "Data for {!r} already completely loaded, aborting.".format(self.model.name)
return []
self._get_indices(df.columns)
print "Chunk Size: " + str(self.config['chunk_size'])
print "No. Worker: " + str(self.config['worker'])
print "Groupby Fl: " + str(self.config['group_by'])
print "Parent Fld: " + str(self.config['parent_field'])
print "Context : " + str(self.config['context'])
thread = RpcThread(self.config['worker'])
self.run_times = [] # Calculate average time
st = time()
group_obj = self.group_data(df)
groups = group_obj.groups.copy()
def schedule_thread(key):
log_work = partial(log, context=self.key_log_context(key))
grp_lock = key[1][0]
dep_lock = key[1][1]
if grp_lock.acquire(0):
if dep_lock.acquire(0): # Existing dependency is not resolved yet
dep_lock.release() # Keep dependency resolved, if existed
thread.spawn_thread(
self._work,
group_obj.get_group(key),
key, groups, log_writer)
grp_lock.release() # Cleanup
return True
else:
grp_lock.release() # Cleanup
log_work("INFO: Dependency locked.")
return False
else:
log_work("INFO: Group locked.")
return False
# Print out some useful logs to valdiate the hashlib log print-outs
for gby in self.gby_lock_map.keys():
key = ['', (self.gby_lock_map[gby],'')]
log(str(gby), context=self.key_log_context(key))
for dep in self.dep_lock_map.keys():
key = ['',('',self.dep_lock_map[dep])]
log(str(dep), context=self.key_log_context(key))
while len(groups)>0:
sleep(0.3)
for key in sorted(groups.keys()):
sleep(0.3)
log_work = partial(log, context=self.key_log_context(key))
if not self.grp_lock_map.setdefault( key, threading.Lock()).acquire(0):
log_work("INFO: Lot locked.")
continue
if not schedule_thread(key):
self.grp_lock_map[key].release()
# Give a time estimate after three loads.
if len(self.run_times) == 3:
log('Boosted: Estimated loading time: {:.0f} min.'.format(
(sum(self.run_times)/len(self.run_times))*len(group_obj.groups)/self.config['worker']/60))
thread.wait()
log('Boosted: Avg/Batch - {:.2f}, Total - {:.2f}, Avg/Row {:.4f}'.format(
sum(self.run_times)/len(self.run_times),
(time() - st),
(time() - st)/len(df.index),
))
return self.result_ids
def _work(self, group_df, key, groups, log_writer):
load_df = group_df[group_df.columns.difference(['composite_lock'])]
def load_failed(result):
failed = False
for msg in result['messages']:
if msg['message'].startswith('could not serialize access due to concurrent update'):
return 'serialization issue'
log_work(msg['message'])
if msg['type'] == 'error':
failed = True
return failed
def write_log_file(load_df, result):
if log_writer:
output_df = load_df.loc[:,self.model.id_col].to_frame()
now = datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S')
output_df['timestamp'] = now
output_df['batch'] = self.key_log_context(key)
output_df['loaded'] = result['ids']
reclist = output_df.to_dict('records')
for rec in reclist:
log_writer.write(json.dumps(rec, sort_keys=True, indent=4))
log_work = partial(log, context=self.key_log_context(key))
st = time()
try:
# Load the batch
result = self.conn.load(
load_df.columns.tolist(),
load_df.fillna('').astype(str).values.tolist(),
context=self.config['context']
)
write_log_file(load_df, result)
# Failed with error
if load_failed(result) and not load_failed(result) == 'serialization issue':
log_work("Loading failed with errors...")
# Failed with serialization error, reschedule
elif load_failed(result) == 'serialization issue':
log_work("WARNING: Serialization issue. Rescheduling...")
self.grp_lock_map[key].release()
return
# Success
else:
self.result_ids.append(result['ids'])
for r in load_df.loc[:,self.model.id_col].tolist():
rc = self.dep_lock_map.get(r, None) # Last level has no dependency lock defined.
if rc: rc.release()
except xmlrpclib.Fault as e:
log_work("ERROR: Host error: " + e.faultString)
except Exception as e:
log_work("ERROR: Unknown Problem", sys.exc_info()[:2])
del groups[key]
log_work("Execution Time: {:.4f}".format(time() - st))
self.run_times.append(float(time() - st))
import threading
class RpcThread(object):
def __init__(self, max_connection):
self.semaphore = threading.BoundedSemaphore(max_connection)
self.max_thread_semaphore = threading.BoundedSemaphore(max_connection * 4)
self.thread_list = []
def spawn_thread(self, fun, *args, **kwargs):
def run(*args, **kwargs):
self.semaphore.acquire()
try:
fun(*args, **kwargs)
except Exception:
raise
finally:
self.semaphore.release()
self.max_thread_semaphore.release()
# Interprocess communication starts here
self.max_thread_semaphore.acquire()
thread = threading.Thread(None, run, None, args=args, kwargs=kwargs)
thread.start()
self.thread_list.append(thread)
def wait(self):
for t in self.thread_list:
t.join()
def thread_number(self):
return len(self.thread_list)
"""Manipulate & Load Pandas DataFreames in an Odoo ETL project"""
import re
import json
import pandas as pd, numpy as np, networkx as nx
from functools import wraps
from pprint import pformat
from os.path import commonprefix
import openerplib
CONN_PARAMS = [
"odoo_host",
"odoo_db",
"odoo_username",
"odoo_password",
"odoo_protocol",
"odoo_port"
]
TYPE_MAP = {
'boolean': 'boolean',
'integer': 'bigint',
'float': 'float',
'monetary': 'float',
'char': 'string',
'text': 'string',
'html': 'string',
'date': 'date',
'datetime': 'date',
'binary': 'string',
'selection': 'string',
'reference': 'string',
'many2one': 'bigint',
'one2many': 'string',
'many2many': 'string',
'serialized': 'string',
}
# Basic logging
def log(*args, **kwargs):
context = kwargs.get('context', 'n/a')
for thing in args:
if type(thing) is dict:
thing = json.dumps(thing)
thing.encode('ascii', 'ignore')
print('Odoo export [{}] - {}'.format(context, thing))
def extract_params(config, params, params_lst):
for k in params_lst:
if k in config:
if bool(config[k]):
params.update({k: config[k]})
del config[k]
def get_conn_config(config):
config_dict = {
'hostname': config.get(CONN_PARAMS[0] ),
'database': config.get(CONN_PARAMS[1] ),
'login' : config.get(CONN_PARAMS[2] ),
'password': config.get(CONN_PARAMS[3] ),
'protocol': config.get(CONN_PARAMS[4] , "xmlrpc"),
'port' : int(config.get(CONN_PARAMS[5] , "80" )),
}
if any([x == "" for x in config_dict.values()]):
raise Exception("Mandatory connection parameters are missing.")
return config_dict
def get_connection(conn_info):
return openerplib.get_connection(**conn_info)
def check_prefix(prefix, search=re.compile(r'[^a-z0-9.]').search):
if not bool(search(prefix)):
return prefix
raise Exception("Prefix name is invalid.")
def check_modell(modell, search=re.compile(r'[^a-z0-9._]').search):
if not bool(search(modell)):
return modell
raise Exception("Model name is invalid.")
def get_custom_meta(meta):
return meta['custom']['kv']
def get_id_column(df):
if '.id' in df.columns:
return '.id'
if 'id' in df.columns:
return 'id'
raise Exception("No ID column defined. Define one of 'id' (ExtId) or '.id' (DbId)")
def infer_model(df):
id_column_list = (
df
.loc[:,'id']
.tolist()
)
if len(id_column_list) == 0:
raise Exception('ID column doesn\'t have any values.')
prefix = commonprefix(id_column_list)
if len(prefix) == 0:
raise Exception('ID column doesn\'t have a common prefix in it\'s values.')
if not '.' in prefix:
raise Exception('ID column\'s common prefix does not contain a module specification (no "." present).')
model = (
prefix
.split('__')[0] # composed keys with only 1 base value import.product_attruibute__attr_val1
.rstrip('_')
.split('.')[1]
.replace('_','.')
)
return model
class Project(object):
""" Project level helpers"""
def __init__(self, prefix, config={}):
self.prefix = check_prefix(prefix)
self.conn_info = config and get_conn_config(config)
self.rInstance = self.conn_info and get_connection(self.conn_info) or None
self.mgraph = nx.DiGraph(prefix=self.prefix)
def make_xid(self, model, row, idxs):
p = re.compile('[\W_]+')
vals = {
'prefix': self.prefix,
'model' : model.replace('.', '_'),
'iid' : (
p.sub(
'_',(
row
.loc[idxs]
.astype(str)
.str
.cat(sep='_', na_rep='?')
.lower()
)
)
) # pd.Series
}
return "{prefix}.{model}__{iid}".format(**vals)
def assign_xid(self, model, df, idxs):
return df.apply(lambda row: self.make_xid(model, row, idxs), axis=1)
def get_inferred_model(self, df, meta={}):
if get_custom_meta(meta):
if '.id' in df.columns and 'model' not in get_custom_meta(meta):
raise Exception("If you have a Db primary Id ('.id' column) defined, you need to specify the model in the metadata: {\"model\":\"MODEL.NAME\"}")
if 'model' in get_custom_meta(meta):
return Model(self, get_custom_meta(meta)['model'], meta=meta)
if 'id' not in df.columns:
raise Exception("No id column, nor metadata model defined in this dataframe")
return Model(self, infer_model(df), meta=meta)
else:
if 'id' not in df.columns:
raise Exception("No id column, nor metadata model defined in this dataframe")
return Model(self, infer_model(df), meta=meta)
class Model(object):
"""Model level helper"""
def __repr__(self):
return '<%s object for %r at %s>' % (
self.__class__.__name__,
self.name,
hex(id(self))
)
def __init__(self, Project, model, config={}, meta={}):
self.project = Project
self.name = check_modell(model)
self.meta = meta
self.custom_meta = meta and get_custom_meta(meta) or {}
self.id_col = None
self.df = pd.DataFrame
self.rInstance = self.project.rInstance
self.conn_info = config and get_conn_config(config) or self.project.conn_info
self.rModel = self.conn_info and get_connection(self.conn_info).get_model(self.name) or None
self.project.mgraph.add_node(self, model=self.name, meta=self.meta)
if self.rModel:
try:
self.info = self.rModel.get_model_info()
self._parent_name = self.info['_parent_name']
self._inherits = self.info['_inherits']
self._fields = self.info['_fields']
except:
pass
def need_model_info(func):
@wraps(func)
def func_wrapper(*args, **kwargs):
if not hasattr(args[0], 'info'):
raise Exception("Model info was not loaded: {!r}".format(args[0]))
return func(*args, **kwargs)
return func_wrapper
def need_dataframe(func):
@wraps(func)
def func_wrapper(*args, **kwargs):
if bool(args[0].df.empty):
raise Exception("Dataframe is empty. Load with `load_df(df)`. {!r}".format(args[0]))
return func(*args, **kwargs)
return func_wrapper
def make_xid(self, *args):
return self.project.make_xid(self.name, *args)
def assign_xid(self, *args):
return self.project.assign_xid(self.name, *args)
@need_model_info
def get_read_schema(self, fields=[], writable=False, required=False):
if not isinstance(fields, list):
raise Exception("Fields must be of type list")
result = {'columns': []}
for f, attrs in self._fields.iteritems():
if writable and attrs['readonly']: continue
if required and not attrs['required']: continue
if not fields or f in fields:
result['columns'].append({
'name': f,
'type': TYPE_MAP[attrs['type']],
'comment': str(attrs['string']) + '\n>>\n' + str(pformat(attrs, indent=2))
})
return result
@need_model_info
def get_group_by_candidates(self):
result = []
for f, attrs in self._fields.iteritems():
if attrs['type'] == 'one2many': result.append(f)
return result
@need_model_info
def load_df(self, df):
self.id_col = get_id_column(df)
self.df = df
for col in df.columns:
field = col.split('/')[0]
if field not in self._fields or field == self._parent_name:
continue
if 'relation' not in self._fields[field]:
continue
relation = self._fields[field]['relation']
for node, model in list(self.project.mgraph.nodes(data='model')):
if relation == model:
self.project.mgraph.add_edge(self, node)
@need_model_info
@need_dataframe
def get_invalid_column_names(self):
result = []
for col in self.df.columns:
if '.id' == col: # Db primary Id column
field = 'id'
elif '/.' in col: # DbId relation column
field = col.split('/.')[0]
else:
field = col.split('/')[0] # ExtId relation column
result.append(field not in self._fields)
return result
@need_model_info
@need_dataframe
def get_invalid_relation_column_names(self):
result = []
for col in self.df.columns:
field = col.split('/')[0]
idstr = col.split('/')[1] if len(col.split('/')) > 1 else None
if field in self._fields and 'relation' in self._fields[field] and idstr not in ['id', '.id']:
result.append(col)
return result
@need_model_info
@need_dataframe
def validate_dataframe(self):
if True in self.get_invalid_column_names():
raise Exception("Not all column names are valid in {!r}. Invalids: {!r}".format(
self.name, self.df.columns[self.get_invalid_column_names()]))
if u'model' in self.custom_meta:
if u'.id' in self.df.columns:
raise Exception("It seems you have defined a db id column '.id', but haven't defined a model in the df's metadata.")
elif self.custom_meta['model'] != self.name:
raise Exception("The 'model' metadata key doesn't seem to fit {!r}.".format(self.name))
elif infer_model(self.df) != self.name:
raise Exception("The 'id' column doesn't seem to fit {!r}.".format(self.name))
if len(self.get_invalid_relation_column_names()):
raise Exception("Relational columns {!r} are not properly formed in {!r}.".format(
self.get_invalid_relation_column_names(), self.name))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment