Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Example usage of umd-mith/veefor
#%%
from dataclasses import asdict, fields
from datetime import datetime
import pytz
import json
import re
from collections import Counter
import ulid
from pydantic import ValidationError
import lakeland_db_migrate_v4 as lv4
import lakeland_db_migrate_v4.source_mappings as sm
# %%
MAGIC_NUMBERS = {
"accessions": 51,
"files": 10325,
"items": 4428,
"entities": 1134,
"subjects": 94,
"relationships": 24,
}
expected_counts = Counter(MAGIC_NUMBERS)
#%%
def load_all():
"""Create a dictionary of source data indexed by Airtable identifiers for convenience."""
source_files = [
"Accessions.json",
"Files.json",
"Entities.json",
"Subjects.json",
"Relationships.json",
"Items.json",
]
fieldmaps = [
sm.accessions_source_column_mappings,
sm.files_source_column_mappings,
sm.entities_source_column_mappings,
sm.subjects_source_column_mappings,
sm.relationships_source_column_mappings,
sm.items_source_column_mappings,
]
airtable_lookup_map = {}
for argpair in list(zip(source_files, fieldmaps)):
instances = lv4.validate_inputs(*argpair)
for ix in instances:
airtable_lookup_map[ix.airtable_idno] = ix
return airtable_lookup_map
# %%
airtable_datadict = load_all()
# %%
# Crude check to make sure nothing obvious got screwed up in loading the data
checker = Counter([type(v).__name__ for k, v in airtable_datadict.items()])
assert checker["AccessionSourceRecord"] == expected_counts["accessions"]
assert checker["FileSourceRecord"] == expected_counts["files"]
assert checker["ItemSourceRecord"] == expected_counts["items"]
assert checker["EntityRelationshipSourceRecord"] == expected_counts["relationships"]
assert checker["SubjectSourceRecord"] == expected_counts["subjects"]
# =========================== CREATE V4 RECORDS ========================================
# %%
def handle_accessions(instance):
keys_to_init = {
"airtable_idno",
"airtable_created_time",
"title",
"description",
"donor_name",
"donation_date",
"legacy_idno_umd",
"idno",
"file_array",
}
all_consumed_keys = [*keys_to_init]
sidecar_data = filter_extra_fields(instance, all_consumed_keys)
v4_record = lv4.DonationGroupingRecord(
idno=ulid.new().str,
v3_airtable_idno=instance.airtable_idno,
v3_airtable_created_time=instance.airtable_created_time,
title=instance.title,
description=instance.description,
donor_name=instance.donor_name,
donor_email="",
donor_phone="",
donation_date=instance.donation_date,
legacy_idno=instance.legacy_idno_umd,
v3_files_array=instance.file_array,
)
return (v4_record, sidecar_data)
#%%
def handle_files(instance):
keys_to_init = {
"airtable_idno",
"airtable_created_time",
}
all_consumed_keys = [*keys_to_init]
sidecar_data = filter_extra_fields(instance, all_consumed_keys)
v4_record = lv4.FileRecord(
idno=ulid.new().str,
v3_airtable_idno=instance.airtable_idno,
v3_airtable_created_time=instance.airtable_created_time,
donation_grouping_id=instance.linked_accession,
)
return (v4_record, sidecar_data)
#%%
def handle_subjects(instance):
keys_to_init = {
"airtable_idno",
"airtable_created_time",
"name",
"category",
}
all_consumed_keys = [*keys_to_init]
sidecar_data = filter_extra_fields(instance, all_consumed_keys)
v4_record = lv4.SubjectRecord(
idno=ulid.new().str,
v3_airtable_idno=instance.airtable_idno,
v3_airtable_created_time=instance.airtable_created_time,
name=instance.name,
subject_type=instance.category,
)
return (v4_record, sidecar_data)
#%%
def handle_entities(instance):
keys_to_init = {
"airtable_idno",
"airtable_created_time",
"name",
"alt_name",
"bio_hist",
"category",
"idno",
"lchp_source_code",
}
all_consumed_keys = [*keys_to_init]
sidecar_data = filter_extra_fields(instance, all_consumed_keys)
v4_record = lv4.EntityRecord(
idno=ulid.new().str,
v3_airtable_idno=instance.airtable_idno,
v3_airtable_created_time=instance.airtable_created_time,
name=instance.name,
alt_name=instance.alt_name,
bio_hist=instance.bio_hist,
entity_type=instance.category,
legacy_idno_lchp=instance.lchp_source_code,
)
return (v4_record, sidecar_data)
#%%
def handle_relationships(instance):
keys_to_init = {
"airtable_idno",
"airtable_created_time",
"entity_1",
"entity_2",
"name",
"relation_type",
}
all_consumed_keys = [*keys_to_init]
sidecar_data = filter_extra_fields(instance, all_consumed_keys)
v4_record = lv4.EntityRelationshipRecord(
idno=ulid.new().str,
v3_airtable_idno=instance.airtable_idno,
v3_airtable_created_time=instance.airtable_created_time,
name=instance.name,
subject_entity=instance.entity_1,
object_entity=instance.entity_2,
relationship_predicate=instance.relation_type,
)
return (v4_record, sidecar_data)
#%%
def filter_extra_fields(instance, consumed_keys):
all_src_keys = asdict(instance)
noninit_keys = [k for k in all_src_keys.keys() if k not in consumed_keys]
popped = []
for k in noninit_keys:
pk = all_src_keys.pop(k)
if pk not in [[], "", False]:
popped.append((k, pk))
return dict(popped)
#%%
def handle_item_types(instance, matcher):
itypes, icats = matcher
it = instance.__getattribute__(itypes)
ic = instance.__getattribute__(icats)
flattened_types = []
if not isinstance(it, str):
flattened_types.extend(it)
# raise RuntimeError("item type screwup")
else:
flattened_types.append(it)
if isinstance(ic, str):
flattened_types.append(ic)
elif isinstance(ic, list):
flattened_types + ic
return list(filter(None, flattened_types))
#%%
def handle_collections(collection_name):
if not collection_name.startswith("Lakeland - African Americans in College Park"):
return collection_name
else:
return ""
#%%
def handle_item(instance: lv4.sources.ItemSourceRecord) -> None:
"""Migrate source items to v4 items"""
keys_to_init = {
"airtable_idno",
"airtable_created_time",
"legacy_idno_umd",
"title",
"description",
"created_date",
}
types_cats = {
"obj_type",
"category",
}
collections = {"collection"}
v4_type = handle_item_types(instance, types_cats)
all_consumed_keys = [*keys_to_init, *types_cats, *collections]
sidecar_data = filter_extra_fields(instance, all_consumed_keys)
try:
v4_item_record = lv4.ItemRecord(
idno=ulid.new().str,
v3_airtable_idno=instance.airtable_idno,
v3_airtable_created_time=instance.airtable_created_time,
title=instance.title,
description=instance.description,
v3_created_date=instance.created_date,
item_type=v4_type,
collection=handle_collections(instance.collection),
)
except ValidationError as err:
print(err)
print(v4_type)
return (v4_item_record, sidecar_data)
#%%
V4_RECORDS = {}
def migrate(input_pair):
v3_airtable_idno, record_instance = input_pair
if isinstance(record_instance, lv4.sources.AccessionSourceRecord):
res = handle_accessions(record_instance)
V4_RECORDS[res[0].v3_airtable_idno] = res
if isinstance(record_instance, lv4.sources.FileSourceRecord):
res = handle_files(record_instance)
V4_RECORDS[res[0].v3_airtable_idno] = res
if isinstance(record_instance, lv4.sources.SubjectSourceRecord):
res = handle_subjects(record_instance)
V4_RECORDS[res[0].v3_airtable_idno] = res
if isinstance(record_instance, lv4.sources.EntitySourceRecord):
res = handle_entities(record_instance)
V4_RECORDS[res[0].v3_airtable_idno] = res
if isinstance(record_instance, lv4.sources.EntityRelationshipSourceRecord):
res = handle_relationships(record_instance)
V4_RECORDS[res[0].v3_airtable_idno] = res
if isinstance(record_instance, lv4.sources.ItemSourceRecord):
res = handle_item(record_instance)
V4_RECORDS[res[0].v3_airtable_idno] = res
# %%
for i in airtable_datadict.items():
migrate(i)
# %%
assert len(V4_RECORDS) == sum(expected_counts.values())
len(V4_RECORDS)
# =========================== RESOLVE RECORD LINKAGES ========================================
# %%
airtable_id_regex = re.compile(r"rec[a-zA-Z0-9]{14}")
#%%
RESOLVED_V4_RECORDS = {}
sidecar_key_collector = []
for recitem in V4_RECORDS.items():
rec, sidecar = recitem[1]
v4_files = []
if isinstance(rec, lv4.DonationGroupingRecord):
if isinstance(rec.v3_files_array, list):
resolved_files = [V4_RECORDS[fid][0].idno for fid in rec.v3_files_array]
v4_files.extend(resolved_files)
if isinstance(rec.v3_files_array, str):
v4_files.append(V4_RECORDS[rec.v3_files_array][0].idno)
RESOLVED_V4_RECORDS[rec.idno] = (rec, {**sidecar, "v4_files": resolved_files})
if isinstance(rec, lv4.FileRecord):
rec.donation_grouping_id = V4_RECORDS[sidecar["linked_accession"]][0].idno
try:
rec.item_id = V4_RECORDS[sidecar["part_of_item"]][0].idno
except KeyError:
pass
used_keys = {"linked_accession", "part_of_item"}
new_sidecar = {k: v for k, v in sidecar.items() if k not in used_keys}
RESOLVED_V4_RECORDS[rec.idno] = (rec, new_sidecar)
if isinstance(rec, lv4.SubjectRecord):
if isinstance(sidecar["linked_items_array"], list):
resolved_items = [
V4_RECORDS[i][0].idno for i in sidecar["linked_items_array"]
]
else:
resolved_items = [V4_RECORDS[sidecar["linked_items_array"]][0].idno]
rec.linked_items = resolved_items
used_keys = {"linked_items_array"}
new_sidecar = {k: v for k, v in sidecar.items() if k not in used_keys}
RESOLVED_V4_RECORDS[rec.idno] = (rec, new_sidecar)
if isinstance(rec, lv4.EntityRecord):
RESOLVED_V4_RECORDS[rec.idno] = (rec, sidecar)
if isinstance(rec, lv4.EntityRelationshipRecord):
try:
rec.subject_entity = V4_RECORDS[rec.subject_entity][0].idno
except KeyError as err:
print("Errored on {}".format(rec.v3_airtable_idno))
try:
rec.object_entity = V4_RECORDS[rec.object_entity][0].idno
except KeyError as err:
print("Errored on {}".format(rec.v3_airtable_idno))
RESOLVED_V4_RECORDS[rec.idno] = (rec, sidecar)
if isinstance(rec, lv4.ItemRecord):
try:
resolved_linked_people = [
V4_RECORDS[p][0].idno for p in sidecar["linked_people"]
]
rec.linked_entities.extend(resolved_linked_people)
except KeyError as err:
pass
try:
resolved_sources = [
V4_RECORDS[s][0].idno for s in sidecar["linked_entity_source"]
]
rec.linked_entities_as_sources.extend(resolved_sources)
except KeyError as err:
pass
try:
if isinstance(sidecar["linked_places_orgs"], list):
resolved_place_orgs = [
V4_RECORDS[po][0].idno for po in sidecar["linked_places_orgs"]
]
rec.linked_entities.extend(resolved_place_orgs)
else:
rec.linked_entities.append(
V4_RECORDS[sidecar["linked_places_orgs"]][0].idno
)
except KeyError as err:
pass
try:
if isinstance(sidecar["linked_subjects"], list):
resolved_subjects = [
V4_RECORDS[sbj][0].idno for sbj in sidecar["linked_subjects"]
]
rec.linked_subjects.extend(resolved_subjects)
elif isinstance(sidecar["linked_subjects"], str):
rec.linked_subjects.append(
V4_RECORDS[sidecar["linked_subjects"]][0].idno
)
else:
raise RuntimeError("Got weird subject")
except KeyError as err:
pass
try:
if isinstance(sidecar["linked_entity_interviewees"], str):
rec.linked_entities_as_interviewees.append(
V4_RECORDS[sidecar["linked_entity_interviewees"]][0].idno
)
elif isinstance(sidecar["linked_entity_interviewees"], list):
resolved_interviewees = [
V4_RECORDS[ie][0].idno
for ie in sidecar["linked_entity_interviewees"]
]
rec.linked_entities_as_interviewees.extend(resolved_interviewees)
else:
raise RuntimeError("Got an interviewee of invalid type")
except KeyError as err:
pass
try:
if isinstance(sidecar["linked_entity_interviewers"], str):
rec.linked_entities_as_interviewers.append(
V4_RECORDS[sidecar["linked_entity_interviewers"]][0].idno
)
elif isinstance(sidecar["linked_entity_interviewers"], list):
resolved_interviewers = [
V4_RECORDS[ie][0].idno
for ie in sidecar["linked_entity_interviewers"]
]
rec.linked_entities_as_interviewers.extend(resolved_interviewers)
else:
raise RuntimeError("Got an interviewer of invalid type")
except KeyError as err:
pass
used_keys = {
"linked_people",
"linked_entity_source",
"linked_places_orgs",
"linked_subjects",
"linked_entity_interviewees",
"linked_entity_interviewers",
}
new_sidecar = {k: v for k, v in sidecar.items() if k not in used_keys}
RESOLVED_V4_RECORDS[rec.idno] = (rec, new_sidecar)
#%%
assert len(V4_RECORDS) == len(RESOLVED_V4_RECORDS)
items = [n[0] for n in RESOLVED_V4_RECORDS.values() if isinstance(n[0], lv4.ItemRecord)]
items_with_no_links = []
for i in items:
sentinel_value = 0
sentinel_value += len(i.linked_entities)
sentinel_value += len(i.linked_entities_as_interviewers)
sentinel_value += len(i.linked_entities_as_interviewees)
sentinel_value += len(i.linked_entities_as_sources)
if sentinel_value == 0:
items_with_no_links.append(i)
print("{} items had no linkages".format(len(items_with_no_links)))
# =========================== PERSIST V4 RECORDS ========================================
#%%
TZ = pytz.timezone("US/Eastern")
to_json = []
table_keys = {
"DonationGroupingRecord": "Donation Groups",
"FileRecord": "Files",
"EntityRecord": "Entities",
"SubjectRecord": "Subjects",
"EntityRelationshipRecord": "Relationships",
"ItemRecord": "Items",
}
for rv in RESOLVED_V4_RECORDS.values():
rrec, rsidecar = rv
record_time = datetime.now(TZ)
to_save = {
**asdict(rrec),
"table_key": table_keys[type(rrec).__name__],
"migrated_at": record_time.isoformat(),
"extra_fields": rsidecar,
}
to_json.append(to_save)
with open("outputs/migrated_records.json", "w") as outfile:
outfile.write(json.dumps(to_json))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment