Created
October 9, 2018 15:06
-
-
Save blaggacao/ba83a7e500c202cf842010cfddb25e15 to your computer and use it in GitHub Desktop.
Parallel loading
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Load data efficiently into Odoo""" | |
import threading | |
import xmlrpclib | |
import hashlib | |
import sys | |
import json | |
import datetime | |
import pandas as pd, numpy as np | |
from time import time, sleep | |
from ast import literal_eval | |
from functools import partial | |
from rpc_thread import RpcThread | |
from utils import log | |
LOAD_PARAMS = [ | |
"group_by", | |
"odoo_context", | |
"chunk_size", | |
"worker", | |
"call" | |
] | |
def get_load_config(config): | |
return { | |
'group_by' : config.get(LOAD_PARAMS[0]), | |
'context' : config.get(LOAD_PARAMS[1]) or {}, | |
'chunk_size': int(config.get(LOAD_PARAMS[2]) or 15), | |
'worker' : int(config.get(LOAD_PARAMS[3]) or 3 ), | |
'call' : config.get(LOAD_PARAMS[4]) or [], | |
} | |
def convert(item): | |
"""Prep vals for loading""" | |
try: return item.item() # Flatten from dtypes | |
except: | |
if item != item: return '' # Item is a nan | |
else: return item | |
class DummyLock(object): | |
@staticmethod | |
def acquire(blocking=None): | |
return True | |
@staticmethod | |
def release(): | |
return True | |
class BaseLoader(object): | |
""" | |
Loader for Odoo Model Data, supporting | |
- Parallel loading via multiple connections (threads/workers) | |
- Batching by chunk_size to avoid timeouts | |
- Group by lock to avoid concurrent updates | |
- Implicit toposort for parent column dependency resolution | |
""" | |
def __repr__(self): | |
return '<%s object for %r at %s>' % ( | |
self.__class__.__name__, | |
self.name, | |
hex(id(self)) | |
) | |
def __init__(self, Model, config={}): | |
self.model = Model | |
self.conn = Model.rModel | |
self.config = config and get_load_config(config) or config | |
self.config.update({ | |
'parent_field': self.model._parent_name, | |
'groupby_flds': self.model.get_group_by_candidates() | |
}) | |
# Keep track of locks globally | |
self.gby_lock_map = {} | |
self.dep_lock_map = {} | |
self.grp_lock_map = {} | |
# Retain result ids for call methods | |
self.result_ids = [] | |
def _load_schema(self, schema): | |
self.headers = [col['name'] for col in schema['columns']] | |
def _get_indices(self, headers): | |
headers = [h.split('/')[0] for h in headers] # Normalize field name to Odoo | |
groupby_flds = [f in headers for f in self.config['groupby_flds']] | |
self.indices = {} | |
for item in ['group_by', 'parent_field']: | |
try: | |
self.indices[item] = headers.index(self.config[item]) | |
except: | |
log("Column {!r} not found".format(item)) | |
self.indices[item] = None | |
self.indices.update({ | |
'id_column' : self.indices['parent_field'] and headers.index(self.model.id_col) or None, | |
'group_by' : self.indices['group_by'] or (len(groupby_flds) and groupby_flds[0]) or None # TODO implement multiple grouping cols | |
}) | |
def group_data(self, df): | |
""" | |
Groups & Slices Data to enforce chunck size, parent key toposort and mutually exclusive (groupby) values. | |
..param df: pandas.DataFrame | |
..return: pandas.groupby Object | |
""" | |
chunk = self.config['chunk_size'] | |
return ( | |
df | |
.assign( | |
composite_lock=self._assign_locks | |
) | |
.groupby( | |
by=[ | |
(np.arange(len(df))//chunk)+1, # This is the best magic I've ever seen so far!!! (https://stackoverflow.com/a/25703030) | |
'composite_lock', | |
], | |
sort=False # Save resources | |
) | |
) | |
def _assign_locks(self, df): | |
return zip( | |
df.apply(self._make_group_lock_key, axis=1), | |
df.apply(self._make_parent_lock_key, axis=1) | |
) # (Unlocked, Locked) by default | |
def _make_group_lock_key(self, row): | |
"""Return a unblocked groupby lock per row.""" | |
g_ix = self.indices['group_by'] | |
g_key = g_ix and convert(row[g_ix]) or None | |
g_lck = self.gby_lock_map.setdefault( | |
g_key, | |
g_key and threading.Lock() or DummyLock() | |
) | |
return g_lck | |
def _make_parent_lock_key(self, row): | |
"""Return a blocked parent-child dependency lock per row.""" | |
p_ix = self.indices['parent_field'] | |
p_key = p_ix and convert(row[p_ix]) or None | |
p_lck = self.dep_lock_map.setdefault( | |
p_key, | |
p_key and threading.Lock() or DummyLock() | |
) | |
# Block parent_dependencies by default. | |
# Will be unblocked after that parent is loaded. | |
p_lck.acquire(0) | |
return p_lck | |
def _make_lock_key(self, row): | |
g_ix, d_ix = self.indices['group_by'], self.indices['parent_field'] | |
g_key = g_ix and convert(row[g_ix]) or None | |
d_key = d_ix and convert(row[d_ix]) or None | |
g_lck = self.gby_lock_map.setdefault(g_key, g_key and threading.Lock() or DummyLock()) | |
d_lck = self.dep_lock_map.setdefault(d_key, d_key and threading.Lock() or DummyLock()) | |
d_lck.acquire(0) # Else: (Re-)Load the gun | |
return (g_lck, d_lck) # (Unlocked, Locked) by default | |
def key_log_context(self, key): | |
return "Grp: {exl:>5} / Hie: {rec:>5} / Lot: {lot:>5} / Thrd: {thrd!r:>12}".format( | |
exl=key[1][0] and hashlib.md5(str(key[1][0])).hexdigest()[:5], | |
rec=key[1][1] and hashlib.md5(str(key[1][1])).hexdigest()[:5], | |
lot=key[0], thrd=threading.currentThread().name) | |
def boost(self, df=pd.DataFrame, log_writer=None, prev_load_log=None): | |
""" | |
Threaded loading of data at the speed of light. | |
..param: pandas.DataFrame | |
""" | |
if bool(df.empty): | |
if not bool(self.model.df.empty): | |
df = self.model.df | |
else: | |
raise Excpetion("We need a dataframe. Either explicitly passed or loaded on the Model object.") | |
# Filter out previously loaded rows (based on a json file log) | |
if prev_load_log: | |
ids_not_to_load = [d[self.model.id_col] for d in prev_load_log if d['loaded'] is not False] | |
df = df[~df[self.model.id_col].isin(ids_not_to_load)] | |
if not df.shape[0]: | |
print "Data for {!r} already completely loaded, aborting.".format(self.model.name) | |
return [] | |
self._get_indices(df.columns) | |
print "Chunk Size: " + str(self.config['chunk_size']) | |
print "No. Worker: " + str(self.config['worker']) | |
print "Groupby Fl: " + str(self.config['group_by']) | |
print "Parent Fld: " + str(self.config['parent_field']) | |
print "Context : " + str(self.config['context']) | |
thread = RpcThread(self.config['worker']) | |
self.run_times = [] # Calculate average time | |
st = time() | |
group_obj = self.group_data(df) | |
groups = group_obj.groups.copy() | |
def schedule_thread(key): | |
log_work = partial(log, context=self.key_log_context(key)) | |
grp_lock = key[1][0] | |
dep_lock = key[1][1] | |
if grp_lock.acquire(0): | |
if dep_lock.acquire(0): # Existing dependency is not resolved yet | |
dep_lock.release() # Keep dependency resolved, if existed | |
thread.spawn_thread( | |
self._work, | |
group_obj.get_group(key), | |
key, groups, log_writer) | |
grp_lock.release() # Cleanup | |
return True | |
else: | |
grp_lock.release() # Cleanup | |
log_work("INFO: Dependency locked.") | |
return False | |
else: | |
log_work("INFO: Group locked.") | |
return False | |
# Print out some useful logs to valdiate the hashlib log print-outs | |
for gby in self.gby_lock_map.keys(): | |
key = ['', (self.gby_lock_map[gby],'')] | |
log(str(gby), context=self.key_log_context(key)) | |
for dep in self.dep_lock_map.keys(): | |
key = ['',('',self.dep_lock_map[dep])] | |
log(str(dep), context=self.key_log_context(key)) | |
while len(groups)>0: | |
sleep(0.3) | |
for key in sorted(groups.keys()): | |
sleep(0.3) | |
log_work = partial(log, context=self.key_log_context(key)) | |
if not self.grp_lock_map.setdefault( key, threading.Lock()).acquire(0): | |
log_work("INFO: Lot locked.") | |
continue | |
if not schedule_thread(key): | |
self.grp_lock_map[key].release() | |
# Give a time estimate after three loads. | |
if len(self.run_times) == 3: | |
log('Boosted: Estimated loading time: {:.0f} min.'.format( | |
(sum(self.run_times)/len(self.run_times))*len(group_obj.groups)/self.config['worker']/60)) | |
thread.wait() | |
log('Boosted: Avg/Batch - {:.2f}, Total - {:.2f}, Avg/Row {:.4f}'.format( | |
sum(self.run_times)/len(self.run_times), | |
(time() - st), | |
(time() - st)/len(df.index), | |
)) | |
return self.result_ids | |
def _work(self, group_df, key, groups, log_writer): | |
load_df = group_df[group_df.columns.difference(['composite_lock'])] | |
def load_failed(result): | |
failed = False | |
for msg in result['messages']: | |
if msg['message'].startswith('could not serialize access due to concurrent update'): | |
return 'serialization issue' | |
log_work(msg['message']) | |
if msg['type'] == 'error': | |
failed = True | |
return failed | |
def write_log_file(load_df, result): | |
if log_writer: | |
output_df = load_df.loc[:,self.model.id_col].to_frame() | |
now = datetime.datetime.now().strftime('%Y-%m-%dT%H:%M:%S') | |
output_df['timestamp'] = now | |
output_df['batch'] = self.key_log_context(key) | |
output_df['loaded'] = result['ids'] | |
reclist = output_df.to_dict('records') | |
for rec in reclist: | |
log_writer.write(json.dumps(rec, sort_keys=True, indent=4)) | |
log_work = partial(log, context=self.key_log_context(key)) | |
st = time() | |
try: | |
# Load the batch | |
result = self.conn.load( | |
load_df.columns.tolist(), | |
load_df.fillna('').astype(str).values.tolist(), | |
context=self.config['context'] | |
) | |
write_log_file(load_df, result) | |
# Failed with error | |
if load_failed(result) and not load_failed(result) == 'serialization issue': | |
log_work("Loading failed with errors...") | |
# Failed with serialization error, reschedule | |
elif load_failed(result) == 'serialization issue': | |
log_work("WARNING: Serialization issue. Rescheduling...") | |
self.grp_lock_map[key].release() | |
return | |
# Success | |
else: | |
self.result_ids.append(result['ids']) | |
for r in load_df.loc[:,self.model.id_col].tolist(): | |
rc = self.dep_lock_map.get(r, None) # Last level has no dependency lock defined. | |
if rc: rc.release() | |
except xmlrpclib.Fault as e: | |
log_work("ERROR: Host error: " + e.faultString) | |
except Exception as e: | |
log_work("ERROR: Unknown Problem", sys.exc_info()[:2]) | |
del groups[key] | |
log_work("Execution Time: {:.4f}".format(time() - st)) | |
self.run_times.append(float(time() - st)) | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import threading | |
class RpcThread(object): | |
def __init__(self, max_connection): | |
self.semaphore = threading.BoundedSemaphore(max_connection) | |
self.max_thread_semaphore = threading.BoundedSemaphore(max_connection * 4) | |
self.thread_list = [] | |
def spawn_thread(self, fun, *args, **kwargs): | |
def run(*args, **kwargs): | |
self.semaphore.acquire() | |
try: | |
fun(*args, **kwargs) | |
except Exception: | |
raise | |
finally: | |
self.semaphore.release() | |
self.max_thread_semaphore.release() | |
# Interprocess communication starts here | |
self.max_thread_semaphore.acquire() | |
thread = threading.Thread(None, run, None, args=args, kwargs=kwargs) | |
thread.start() | |
self.thread_list.append(thread) | |
def wait(self): | |
for t in self.thread_list: | |
t.join() | |
def thread_number(self): | |
return len(self.thread_list) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Manipulate & Load Pandas DataFreames in an Odoo ETL project""" | |
import re | |
import json | |
import pandas as pd, numpy as np, networkx as nx | |
from functools import wraps | |
from pprint import pformat | |
from os.path import commonprefix | |
import openerplib | |
CONN_PARAMS = [ | |
"odoo_host", | |
"odoo_db", | |
"odoo_username", | |
"odoo_password", | |
"odoo_protocol", | |
"odoo_port" | |
] | |
TYPE_MAP = { | |
'boolean': 'boolean', | |
'integer': 'bigint', | |
'float': 'float', | |
'monetary': 'float', | |
'char': 'string', | |
'text': 'string', | |
'html': 'string', | |
'date': 'date', | |
'datetime': 'date', | |
'binary': 'string', | |
'selection': 'string', | |
'reference': 'string', | |
'many2one': 'bigint', | |
'one2many': 'string', | |
'many2many': 'string', | |
'serialized': 'string', | |
} | |
# Basic logging | |
def log(*args, **kwargs): | |
context = kwargs.get('context', 'n/a') | |
for thing in args: | |
if type(thing) is dict: | |
thing = json.dumps(thing) | |
thing.encode('ascii', 'ignore') | |
print('Odoo export [{}] - {}'.format(context, thing)) | |
def extract_params(config, params, params_lst): | |
for k in params_lst: | |
if k in config: | |
if bool(config[k]): | |
params.update({k: config[k]}) | |
del config[k] | |
def get_conn_config(config): | |
config_dict = { | |
'hostname': config.get(CONN_PARAMS[0] ), | |
'database': config.get(CONN_PARAMS[1] ), | |
'login' : config.get(CONN_PARAMS[2] ), | |
'password': config.get(CONN_PARAMS[3] ), | |
'protocol': config.get(CONN_PARAMS[4] , "xmlrpc"), | |
'port' : int(config.get(CONN_PARAMS[5] , "80" )), | |
} | |
if any([x == "" for x in config_dict.values()]): | |
raise Exception("Mandatory connection parameters are missing.") | |
return config_dict | |
def get_connection(conn_info): | |
return openerplib.get_connection(**conn_info) | |
def check_prefix(prefix, search=re.compile(r'[^a-z0-9.]').search): | |
if not bool(search(prefix)): | |
return prefix | |
raise Exception("Prefix name is invalid.") | |
def check_modell(modell, search=re.compile(r'[^a-z0-9._]').search): | |
if not bool(search(modell)): | |
return modell | |
raise Exception("Model name is invalid.") | |
def get_custom_meta(meta): | |
return meta['custom']['kv'] | |
def get_id_column(df): | |
if '.id' in df.columns: | |
return '.id' | |
if 'id' in df.columns: | |
return 'id' | |
raise Exception("No ID column defined. Define one of 'id' (ExtId) or '.id' (DbId)") | |
def infer_model(df): | |
id_column_list = ( | |
df | |
.loc[:,'id'] | |
.tolist() | |
) | |
if len(id_column_list) == 0: | |
raise Exception('ID column doesn\'t have any values.') | |
prefix = commonprefix(id_column_list) | |
if len(prefix) == 0: | |
raise Exception('ID column doesn\'t have a common prefix in it\'s values.') | |
if not '.' in prefix: | |
raise Exception('ID column\'s common prefix does not contain a module specification (no "." present).') | |
model = ( | |
prefix | |
.split('__')[0] # composed keys with only 1 base value import.product_attruibute__attr_val1 | |
.rstrip('_') | |
.split('.')[1] | |
.replace('_','.') | |
) | |
return model | |
class Project(object): | |
""" Project level helpers""" | |
def __init__(self, prefix, config={}): | |
self.prefix = check_prefix(prefix) | |
self.conn_info = config and get_conn_config(config) | |
self.rInstance = self.conn_info and get_connection(self.conn_info) or None | |
self.mgraph = nx.DiGraph(prefix=self.prefix) | |
def make_xid(self, model, row, idxs): | |
p = re.compile('[\W_]+') | |
vals = { | |
'prefix': self.prefix, | |
'model' : model.replace('.', '_'), | |
'iid' : ( | |
p.sub( | |
'_',( | |
row | |
.loc[idxs] | |
.astype(str) | |
.str | |
.cat(sep='_', na_rep='?') | |
.lower() | |
) | |
) | |
) # pd.Series | |
} | |
return "{prefix}.{model}__{iid}".format(**vals) | |
def assign_xid(self, model, df, idxs): | |
return df.apply(lambda row: self.make_xid(model, row, idxs), axis=1) | |
def get_inferred_model(self, df, meta={}): | |
if get_custom_meta(meta): | |
if '.id' in df.columns and 'model' not in get_custom_meta(meta): | |
raise Exception("If you have a Db primary Id ('.id' column) defined, you need to specify the model in the metadata: {\"model\":\"MODEL.NAME\"}") | |
if 'model' in get_custom_meta(meta): | |
return Model(self, get_custom_meta(meta)['model'], meta=meta) | |
if 'id' not in df.columns: | |
raise Exception("No id column, nor metadata model defined in this dataframe") | |
return Model(self, infer_model(df), meta=meta) | |
else: | |
if 'id' not in df.columns: | |
raise Exception("No id column, nor metadata model defined in this dataframe") | |
return Model(self, infer_model(df), meta=meta) | |
class Model(object): | |
"""Model level helper""" | |
def __repr__(self): | |
return '<%s object for %r at %s>' % ( | |
self.__class__.__name__, | |
self.name, | |
hex(id(self)) | |
) | |
def __init__(self, Project, model, config={}, meta={}): | |
self.project = Project | |
self.name = check_modell(model) | |
self.meta = meta | |
self.custom_meta = meta and get_custom_meta(meta) or {} | |
self.id_col = None | |
self.df = pd.DataFrame | |
self.rInstance = self.project.rInstance | |
self.conn_info = config and get_conn_config(config) or self.project.conn_info | |
self.rModel = self.conn_info and get_connection(self.conn_info).get_model(self.name) or None | |
self.project.mgraph.add_node(self, model=self.name, meta=self.meta) | |
if self.rModel: | |
try: | |
self.info = self.rModel.get_model_info() | |
self._parent_name = self.info['_parent_name'] | |
self._inherits = self.info['_inherits'] | |
self._fields = self.info['_fields'] | |
except: | |
pass | |
def need_model_info(func): | |
@wraps(func) | |
def func_wrapper(*args, **kwargs): | |
if not hasattr(args[0], 'info'): | |
raise Exception("Model info was not loaded: {!r}".format(args[0])) | |
return func(*args, **kwargs) | |
return func_wrapper | |
def need_dataframe(func): | |
@wraps(func) | |
def func_wrapper(*args, **kwargs): | |
if bool(args[0].df.empty): | |
raise Exception("Dataframe is empty. Load with `load_df(df)`. {!r}".format(args[0])) | |
return func(*args, **kwargs) | |
return func_wrapper | |
def make_xid(self, *args): | |
return self.project.make_xid(self.name, *args) | |
def assign_xid(self, *args): | |
return self.project.assign_xid(self.name, *args) | |
@need_model_info | |
def get_read_schema(self, fields=[], writable=False, required=False): | |
if not isinstance(fields, list): | |
raise Exception("Fields must be of type list") | |
result = {'columns': []} | |
for f, attrs in self._fields.iteritems(): | |
if writable and attrs['readonly']: continue | |
if required and not attrs['required']: continue | |
if not fields or f in fields: | |
result['columns'].append({ | |
'name': f, | |
'type': TYPE_MAP[attrs['type']], | |
'comment': str(attrs['string']) + '\n>>\n' + str(pformat(attrs, indent=2)) | |
}) | |
return result | |
@need_model_info | |
def get_group_by_candidates(self): | |
result = [] | |
for f, attrs in self._fields.iteritems(): | |
if attrs['type'] == 'one2many': result.append(f) | |
return result | |
@need_model_info | |
def load_df(self, df): | |
self.id_col = get_id_column(df) | |
self.df = df | |
for col in df.columns: | |
field = col.split('/')[0] | |
if field not in self._fields or field == self._parent_name: | |
continue | |
if 'relation' not in self._fields[field]: | |
continue | |
relation = self._fields[field]['relation'] | |
for node, model in list(self.project.mgraph.nodes(data='model')): | |
if relation == model: | |
self.project.mgraph.add_edge(self, node) | |
@need_model_info | |
@need_dataframe | |
def get_invalid_column_names(self): | |
result = [] | |
for col in self.df.columns: | |
if '.id' == col: # Db primary Id column | |
field = 'id' | |
elif '/.' in col: # DbId relation column | |
field = col.split('/.')[0] | |
else: | |
field = col.split('/')[0] # ExtId relation column | |
result.append(field not in self._fields) | |
return result | |
@need_model_info | |
@need_dataframe | |
def get_invalid_relation_column_names(self): | |
result = [] | |
for col in self.df.columns: | |
field = col.split('/')[0] | |
idstr = col.split('/')[1] if len(col.split('/')) > 1 else None | |
if field in self._fields and 'relation' in self._fields[field] and idstr not in ['id', '.id']: | |
result.append(col) | |
return result | |
@need_model_info | |
@need_dataframe | |
def validate_dataframe(self): | |
if True in self.get_invalid_column_names(): | |
raise Exception("Not all column names are valid in {!r}. Invalids: {!r}".format( | |
self.name, self.df.columns[self.get_invalid_column_names()])) | |
if u'model' in self.custom_meta: | |
if u'.id' in self.df.columns: | |
raise Exception("It seems you have defined a db id column '.id', but haven't defined a model in the df's metadata.") | |
elif self.custom_meta['model'] != self.name: | |
raise Exception("The 'model' metadata key doesn't seem to fit {!r}.".format(self.name)) | |
elif infer_model(self.df) != self.name: | |
raise Exception("The 'id' column doesn't seem to fit {!r}.".format(self.name)) | |
if len(self.get_invalid_relation_column_names()): | |
raise Exception("Relational columns {!r} are not properly formed in {!r}.".format( | |
self.get_invalid_relation_column_names(), self.name)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment