Skip to content

Instantly share code, notes, and snippets.

@blaggacao
Last active December 22, 2017 01:24
Show Gist options
  • Save blaggacao/b032459295d4d669af10aadcdfef98c2 to your computer and use it in GitHub Desktop.
Save blaggacao/b032459295d4d669af10aadcdfef98c2 to your computer and use it in GitHub Desktop.
Odoo Export Threaded
def write_row(self, row):
"""
Handle one row of data to export
:param row: a tuple with N strings matching the schema passed to open.
"""
grouped = self.groupby_idx != None
parents = self.parent_field_idx != None
g_key = grouped and convert(row[self.groupby_idx]) or None
d_key = parents and convert(row[self.parent_field_idx]) or None
grp_key = (g_key, self.grp_lock_map.setdefault(g_key, threading.Lock()))
dep_key = (d_key, self.dep_lock_map.setdefault(d_key, threading.Lock()))
if dep_key[0]: # Sparse top level graph items (None)
dep_key[1].acquire(0) # Else: (Re-)Load the gun
composite = (grp_key, dep_key) # (Unlocked, Locked) by default
try:
batch_idx = next(reversed(self.groups[composite]))
row_count = len(self.groups[composite][batch_idx])
except Exception: # We'll start a new
self.batch_index += 1 # Global counter
batch_idx = self.batch_index
else: # But we also account for chunk on existing
chunk_exceeded = row_count + 1 > self.chunk_size
if chunk_exceeded:
self.batch_index += 1 # Global counter
batch_idx = self.batch_index
# Finally write datascructure
self.groups.setdefault(
composite, OrderedDict()
).setdefault(
batch_idx, []
).append(
map(convert, row)
)
self.row_index += 1
def _write_batch(self, batch, extra_columns=[]):
for row in batch:
line = row + extra_columns
self.f.write('\t'.join([str(x) for x in line]) + '\n')
def work(self, ibatch, grp_lock, dep_lock):
st = time()
batch_idx, batch = ibatch
log_context = "Batch: {:>4} / Len: {:>3} / Grp: {}".format(batch_idx, len(batch), hashlib.md5(str((grp_lock, dep_lock))).hexdigest()[:5])
log_work = partial(log, context=log_context)
try:
# result = self.models.execute_kw(self.db, self.uid, self.password, self.model, 'load', [self.headers, batch])
model = openerplib.get_connection(**self.connection_info).get_model(self.model)
result = model.load(self.headers, batch, context=self.context)
to_release = {row[self.id_column_idx] for row in batch}
if result['messages']:
log_work(*result['messages'])
for msg in result['messages']:
row = batch[msg['record']]
to_release ^= set(row[self.id_column_idx])
self._write_batch([row], extra_columns=['Value Error', log_context, json.dumps(msg)])
for r in to_release:
rc = self.dep_lock_map.get(r, None) # Last level has no dependency lock defined.
if rc: rc.release()
if len(batch) != len(result['ids']):
log_work("Batch was not completely uploaded")
except xmlrpclib.Fault as e:
log_work("Host error: " + e.faultString)
self._write_batch(batch, extra_columns=['Host Error', log_context, e.faultString])
except Exception as e:
log_work("Unknown Problem", sys.exc_info()[:2])
self._write_batch(batch, extra_columns=['Unknown Error', log_context, str(sys.exc_info())])
log_work("Loading time: {:.4f}".format(time() - st))
def close(self):
"""
Perform any necessary cleanup
"""
thread = RpcThread(self.worker)
def get_batch_and_run(lot, composite, grp_lock, dep_lock):
try:
batch = lot.popitem(last=False)
except: # lot become empty empty
del self.groups[composite]
else:
thread.spawn_thread(self.work, batch, grp_lock, dep_lock)
finally:
grp_lock and grp_lock.release() # After the work, free group lock for rescheduling
def schedule_thread(composite, lot):
batch = None
grp_lock = composite[0][1]
dep_lock = composite[1][1]
if grp_lock is None and dep_lock is None: # has no lock at all
get_batch_and_run(lot, composite, grp_lock, dep_lock)
elif grp_lock is None or grp_lock.acquire(0):
if dep_lock and not dep_lock.acquire(0): # Existing dependency is not resolved yet
grp_lock and grp_lock.release() # Cleanup
return # Do nothing
dep_lock and dep_lock.release() # Keep dependency resolved, if existed
get_batch_and_run(lot, grp_lock, dep_lock)
while len(self.groups):
for composite, lot in self.groups.iteritems():
schedule_thread(composite, lot)
thread.wait()
self.f.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment