Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
import metadatastore.conf
from collections import deque
from tqdm import tqdm
import ipyparallel as ipp
def main():
old_config = dict(metadatastore.conf.connection_config)
new_config = old_config.copy()
new_config['database'] = 'metadatastore_production_v1'
rc = ipp.Client()
dview = rc[:]
with dview.sync_imports():
from metadatastore.mds import MDS, MDSRO
old = MDSRO(version=0, config=old_config)
new = MDS(version=1, config=new_config)
dview.push({'old': old, 'new': new})
new._connection.drop_database(new_config['database'])
# Drop all indexes on event collection to speed up insert.
# They will be rebuilt the next time an MDS(RO) object connects.
new._event_col.drop_indexes()
total = old._runstart_col.find().count()
for start in tqdm(old.find_run_starts(), desc='start docs', total=total):
new.insert('start', start)
total = old._runstop_col.find().count()
for stop in tqdm(old.find_run_stops(), desc='stop docs', total=total):
try:
new.insert('stop', stop)
except RuntimeError:
print("error inserting run stop with uid {!r}".format(stop['uid']))
descs = deque()
counts = deque()
total = old._descriptor_col.find().count()
for desc in tqdm(old.find_descriptors(), unit='descriptors', total=total):
d_raw = next(old._descriptor_col.find({'uid': desc['uid']}))
num_events = old._event_col.find(
{'descriptor_id': d_raw['_id']}).count()
new.insert('descriptor', desc)
out = dict(desc)
out['run_start'] = out['run_start']['uid']
descs.append(dict(desc))
counts.append(num_events)
def migrate_event_stream(desc_in, num_events):
if num_events:
# skip empty event stream of bulk insert raises
try:
events = old.get_events_generator(descriptor=desc_in,
convert_arrays=False)
new.bulk_insert_events(descriptor=dict(desc_in),
events=(dict(e) for e in events))
except KeyError:
pass
return num_events
v = rc.load_balanced_view()
amr = v.map(migrate_event_stream, descs, list(counts), ordered=False)
total = sum(counts)
with tqdm(total=total, unit='events') as pbar:
for res in amr:
if res:
pbar.update(res)
if __name__ == '__main__':
main()
from metadatastore.mds import MDS, MDSRO
import metadatastore.conf
from collections import deque
from tqdm import tqdm
def compare(o, n):
try:
assert o['uid'] == n['uid']
assert o == n
except AssertionError:
print(o)
print(n)
raise
def main():
old_config = dict(metadatastore.conf.connection_config)
new_config = old_config.copy()
new_config['database'] = 'metadatastore_production_v1'
old = MDSRO(version=0, config=old_config)
new = MDS(version=1, config=new_config)
total = old._runstart_col.find().count()
old_starts = tqdm(old.find_run_starts(), unit='start docs', total=total,
leave=True)
new_starts = new.find_run_starts()
for o, n in zip(old_starts, new_starts):
compare(o, n)
total = old._runstop_col.find().count()
old_stops = iter(tqdm(old.find_run_stops(), unit='stop docs', total=total))
new_stops = iter(new.find_run_stops())
while True:
try:
o = next(old_stops)
n = next(new_stops)
except StopIteration:
break
while True:
if o['uid'] == n['uid']:
break
print('skipping', o['uid'])
o = next(old_stops)
o = dict(o)
n = dict(n)
if o.get('reason') is None or o['reason'] == '':
o.pop('reason', None)
compare(o, n)
descs = deque()
counts = deque()
total = old._descriptor_col.find().count()
old_descs = tqdm(old.find_descriptors(), unit='descriptors', total=total)
new_descs = new.find_descriptors()
for o, n in zip(old_descs, new_descs):
d_raw = next(old._descriptor_col.find({'uid': o['uid']}))
num_events = old._event_col.find({'descriptor_id': d_raw['_id']}).count()
compare(o, n)
descs.append(o)
counts.append(num_events)
total = sum(counts)
with tqdm(total=total, unit='events') as pbar:
for desc, num_events in zip(descs, counts):
old_events = old.get_events_generator(descriptor=desc,
convert_arrays=False)
new_events = new.get_events_generator(descriptor=desc,
convert_arrays=False)
for ev in zip(old_events, new_events):
assert o == n
pbar.update(num_events)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment