Created
March 27, 2023 18:37
-
-
Save vphill/1edce8747364c30e26e45159136fa117 to your computer and use it in GitHub Desktop.
Script to convert OAI-PMH repository for the TFC to CSV
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""untl_breaker script for processing OAI-PMH 2.0 Repository XML Files""" | |
import argparse | |
import sys | |
from xml.etree import ElementTree | |
import csv | |
UNTL_NAMESPACE = "{http://digital2.library.unt.edu/untl/}" | |
UNTL_NSMAP = {"untl": UNTL_NAMESPACE} | |
NAME_FIELDS = ["creator", "contributor", "publisher"] | |
AUBREY_PLACES_PATH = "/Users/Mark/Data/tfc/code/aubrey_titles_place.tsv" | |
def load_untl_places(): | |
places = {} | |
with open(AUBREY_PLACES_PATH) as pl: | |
for line in pl.readlines(): | |
line = line.strip() | |
untl, lat, lon = line.split('\t') | |
places[untl] = f"{lat}, {lon}" | |
return places | |
class Record: | |
"""Base class for a UNTL metadata record in an OAI-PMH | |
Repository file.""" | |
def __init__(self, elem, options): | |
self.elem = elem | |
self.options = options | |
def get_meta_id(self): | |
"""Returns record ARK identifier.""" | |
metas = self.elem[1][0].findall(UNTL_NAMESPACE + "meta") | |
for meta in metas: | |
if meta.get("qualifier") == "ark": | |
meta_id = meta.text | |
break | |
return meta_id | |
def get_main_title(self): | |
"""Return the main title from the record.""" | |
titles = self.elem[1][0].findall(UNTL_NAMESPACE + "title") | |
first_title = titles[0].text | |
main_title = first_title | |
for title in titles: | |
if title.get("qualifier") == "officialtitle": | |
main_title = title.text | |
break | |
return main_title | |
def get_creation_date(self): | |
"""Return the creation date from the record.""" | |
dates = self.elem[1][0].findall(UNTL_NAMESPACE + "date") | |
if dates: | |
first_date = dates[0].text | |
creation_date = first_date | |
for date in dates: | |
if date.get("qualifier") == "creation": | |
creation_date = date.text | |
break | |
else: | |
creation_date = '' | |
return creation_date | |
def get_designers(self): | |
"""Return all designers for the record""" | |
designers = [] | |
creators = self.elem[1][0].findall(UNTL_NAMESPACE + "creator") | |
for creator in creators: | |
if creator.get("qualifier") == "dsr": | |
designers.append(creator.findtext(UNTL_NAMESPACE + "name", "").strip()) | |
contributors = self.elem[1][0].findall(UNTL_NAMESPACE + "contributor") | |
for contributor in contributors: | |
if contributor.get("qualifier") == "dsr": | |
designers.append(contributor.findtext(UNTL_NAMESPACE + "name", "").strip()) | |
return designers | |
def get_donors(self): | |
"""Return all donors for the record""" | |
donors = [] | |
creators = self.elem[1][0].findall(UNTL_NAMESPACE + "creator") | |
for creator in creators: | |
if creator.get("qualifier") == "dnr": | |
donors.append(creator.findtext(UNTL_NAMESPACE + "name", "").strip()) | |
contributors = self.elem[1][0].findall(UNTL_NAMESPACE + "contributor") | |
for contributor in contributors: | |
if contributor.get("qualifier") == "dnr": | |
donors.append(contributor.findtext(UNTL_NAMESPACE + "name", "").strip()) | |
return donors | |
def get_placenames(self): | |
"""Return all placeNames for the record""" | |
places = [] | |
coverages = self.elem[1][0].findall(UNTL_NAMESPACE + "coverage") | |
for coverage in coverages: | |
if coverage.get("qualifier") == "placeName": | |
places.append(coverage.text) | |
return(places) | |
def get_type(self): | |
resource_type = "" | |
resource_types = self.elem[1][0].findall(UNTL_NAMESPACE + "resourceType") | |
resource_type = resource_types[0].text | |
return resource_type | |
def get_record_status(self): | |
"""Returns record status which is either active or deleted""" | |
return self.elem.find("header").get("status", "active") | |
def get_elements(self): | |
"""Yields designated element instances from record.""" | |
elements = self.elem[1][0].findall(UNTL_NAMESPACE + self.options.element) | |
for element in elements: | |
if element is not None: | |
element_dict = {} | |
# Name fields have an additional nesting we need to deal with. | |
if self.options.element in NAME_FIELDS: | |
name = element.findtext(UNTL_NAMESPACE + "name", "").strip() | |
element_dict["value"] = name | |
else: | |
element_dict["value"] = element.text.strip() | |
element_dict["value"] = element_dict["value"].replace("\t", " ") | |
element_dict["value"] = element_dict["value"].replace("\n", " ") | |
element_dict["qualifier"] = element.get("qualifier", 'None') | |
# If "value" is empty we want to skip the element. | |
if not element_dict["value"]: | |
continue | |
# If we have asked for only a specific qualifier, yield only that. | |
if self.options.qualifier: | |
if self.options.qualifier == element_dict['qualifier']: | |
yield element_dict | |
# We didn't ask for a specific qualifier so yield all of them. | |
else: | |
yield element_dict | |
def get_all_data(self): | |
"""Returns a list of all metadata elements and values""" | |
for element in self.elem[1][0]: | |
text = '' | |
if element.tag.replace(UNTL_NAMESPACE, '') in NAME_FIELDS: | |
text = element.findtext(UNTL_NAMESPACE + "name", "").strip() | |
else: | |
text = element.text.strip() | |
if text: | |
value = text.replace("\t", " ") | |
value = value.replace("\n", " ") | |
qualifier = element.get("qualifier", None) | |
tag = element.tag | |
yield (tag, qualifier, value) | |
def has_element(self): | |
"""Returns True or False if a record has value in a selected metadata element""" | |
has_elements = self.elem[1][0].findall(UNTL_NAMESPACE + self.options.element) | |
for element in has_elements: | |
if element.text: | |
return True | |
return False | |
def main(): | |
"""Main file handling and option handling""" | |
parser = argparse.ArgumentParser() | |
parser.add_argument("filename", type=str, | |
help="OAI-PMH UNTL Repository File") | |
args = parser.parse_args() | |
rows = [] | |
untl_places = load_untl_places() | |
for _event, elem in ElementTree.iterparse(args.filename): | |
if elem.tag == "record": | |
record_dict = {} | |
record = Record(elem, args) | |
meta_id = record.get_meta_id() | |
record_dict['ark'] = meta_id | |
record_dict['thumbnail'] = f"https://digital.library.unt.edu/{meta_id}/small/" | |
record_dict['main_title'] = record.get_main_title() | |
record_dict['date'] = record.get_creation_date() | |
designers = record.get_designers()[:2] | |
if designers: | |
if len(designers) == 1: | |
record_dict['designer1'] = designers[0] | |
if len(designers) == 2: | |
record_dict['designer2'] = designers[1] | |
donors = record.get_donors()[:2] | |
if donors: | |
if len(donors) == 1: | |
record_dict['donor1'] = donors[0] | |
if len(donors) == 2: | |
record_dict['donor2'] = donors[1] | |
places = record.get_placenames()[:2] | |
if places: | |
if len(places) == 1: | |
record_dict['place1'] = places[0] | |
record_dict['place1-lat-long'] = untl_places.get(places[0], '') | |
if len(places) == 2: | |
record_dict['place2'] = places[1] | |
record_dict['place2-lat-long'] = untl_places.get(places[1], '') | |
record_dict['type'] = record.get_type() | |
rows.append(record_dict) | |
fieldnames = ['ark', 'thumbnail', 'main_title', 'date', 'designer1', 'designer2', 'donor1', 'donor2', 'place1', 'place1-lat-long', 'place2', 'place2-lat-long', 'type'] | |
with open('my_file.csv', 'w', newline='') as f: | |
writer = csv.DictWriter(f, fieldnames=fieldnames) | |
writer.writeheader() | |
for row in rows: | |
writer.writerow(row) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment