Skip to content

Instantly share code, notes, and snippets.

@danielballan
Last active March 30, 2023 03:50
Show Gist options
  • Save danielballan/b822bebb4e1d7abc5c5cb73b6d9320b7 to your computer and use it in GitHub Desktop.
Save danielballan/b822bebb4e1d7abc5c5cb73b6d9320b7 to your computer and use it in GitHub Desktop.
import metadatastore.conf
from collections import deque
from tqdm import tqdm
import ipyparallel as ipp
def main():
old_config = dict(metadatastore.conf.connection_config)
new_config = old_config.copy()
new_config['database'] = 'metadatastore_production_v1'
rc = ipp.Client()
dview = rc[:]
with dview.sync_imports():
from metadatastore.mds import MDS, MDSRO
old = MDSRO(version=0, config=old_config)
new = MDS(version=1, config=new_config)
dview.push({'old': old, 'new': new})
new._connection.drop_database(new_config['database'])
# Drop all indexes on event collection to speed up insert.
# They will be rebuilt the next time an MDS(RO) object connects.
new._event_col.drop_indexes()
total = old._runstart_col.find().count()
for start in tqdm(old.find_run_starts(), desc='start docs', total=total):
new.insert('start', start)
total = old._runstop_col.find().count()
for stop in tqdm(old.find_run_stops(), desc='stop docs', total=total):
try:
new.insert('stop', stop)
except RuntimeError:
print("error inserting run stop with uid {!r}".format(stop['uid']))
descs = deque()
counts = deque()
total = old._descriptor_col.find().count()
for desc in tqdm(old.find_descriptors(), unit='descriptors', total=total):
d_raw = next(old._descriptor_col.find({'uid': desc['uid']}))
num_events = old._event_col.find(
{'descriptor_id': d_raw['_id']}).count()
new.insert('descriptor', desc)
out = dict(desc)
out['run_start'] = out['run_start']['uid']
descs.append(dict(desc))
counts.append(num_events)
def migrate_event_stream(desc_in, num_events):
if num_events:
# skip empty event stream of bulk insert raises
try:
events = old.get_events_generator(descriptor=desc_in,
convert_arrays=False)
new.bulk_insert_events(descriptor=dict(desc_in),
events=(dict(e) for e in events))
except KeyError:
pass
return num_events
v = rc.load_balanced_view()
amr = v.map(migrate_event_stream, descs, list(counts), ordered=False)
total = sum(counts)
with tqdm(total=total, unit='events') as pbar:
for res in amr:
if res:
pbar.update(res)
if __name__ == '__main__':
main()
from metadatastore.mds import MDS, MDSRO
import metadatastore.conf
from collections import deque
from tqdm import tqdm
def compare(o, n):
try:
assert o['uid'] == n['uid']
assert o == n
except AssertionError:
print(o)
print(n)
raise
def main():
old_config = dict(metadatastore.conf.connection_config)
new_config = old_config.copy()
new_config['database'] = 'metadatastore_production_v1'
old = MDSRO(version=0, config=old_config)
new = MDS(version=1, config=new_config)
total = old._runstart_col.find().count()
old_starts = tqdm(old.find_run_starts(), unit='start docs', total=total,
leave=True)
new_starts = new.find_run_starts()
for o, n in zip(old_starts, new_starts):
compare(o, n)
total = old._runstop_col.find().count()
old_stops = iter(tqdm(old.find_run_stops(), unit='stop docs', total=total))
new_stops = iter(new.find_run_stops())
while True:
try:
o = next(old_stops)
n = next(new_stops)
except StopIteration:
break
while True:
if o['uid'] == n['uid']:
break
print('skipping', o['uid'])
o = next(old_stops)
o = dict(o)
n = dict(n)
if o.get('reason') is None or o['reason'] == '':
o.pop('reason', None)
compare(o, n)
descs = deque()
counts = deque()
total = old._descriptor_col.find().count()
old_descs = tqdm(old.find_descriptors(), unit='descriptors', total=total)
new_descs = new.find_descriptors()
for o, n in zip(old_descs, new_descs):
d_raw = next(old._descriptor_col.find({'uid': o['uid']}))
num_events = old._event_col.find({'descriptor_id': d_raw['_id']}).count()
compare(o, n)
descs.append(o)
counts.append(num_events)
total = sum(counts)
with tqdm(total=total, unit='events') as pbar:
for desc, num_events in zip(descs, counts):
old_events = old.get_events_generator(descriptor=desc,
convert_arrays=False)
new_events = new.get_events_generator(descriptor=desc,
convert_arrays=False)
for ev in zip(old_events, new_events):
assert o == n
pbar.update(num_events)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment