Skip to content

Instantly share code, notes, and snippets.

@vphill
Created March 27, 2023 18:37
Show Gist options
  • Save vphill/1edce8747364c30e26e45159136fa117 to your computer and use it in GitHub Desktop.
Save vphill/1edce8747364c30e26e45159136fa117 to your computer and use it in GitHub Desktop.
Script to convert OAI-PMH repository for the TFC to CSV
"""untl_breaker script for processing OAI-PMH 2.0 Repository XML Files"""
import argparse
import sys
from xml.etree import ElementTree
import csv
UNTL_NAMESPACE = "{http://digital2.library.unt.edu/untl/}"
UNTL_NSMAP = {"untl": UNTL_NAMESPACE}
NAME_FIELDS = ["creator", "contributor", "publisher"]
AUBREY_PLACES_PATH = "/Users/Mark/Data/tfc/code/aubrey_titles_place.tsv"
def load_untl_places():
places = {}
with open(AUBREY_PLACES_PATH) as pl:
for line in pl.readlines():
line = line.strip()
untl, lat, lon = line.split('\t')
places[untl] = f"{lat}, {lon}"
return places
class Record:
"""Base class for a UNTL metadata record in an OAI-PMH
Repository file."""
def __init__(self, elem, options):
self.elem = elem
self.options = options
def get_meta_id(self):
"""Returns record ARK identifier."""
metas = self.elem[1][0].findall(UNTL_NAMESPACE + "meta")
for meta in metas:
if meta.get("qualifier") == "ark":
meta_id = meta.text
break
return meta_id
def get_main_title(self):
"""Return the main title from the record."""
titles = self.elem[1][0].findall(UNTL_NAMESPACE + "title")
first_title = titles[0].text
main_title = first_title
for title in titles:
if title.get("qualifier") == "officialtitle":
main_title = title.text
break
return main_title
def get_creation_date(self):
"""Return the creation date from the record."""
dates = self.elem[1][0].findall(UNTL_NAMESPACE + "date")
if dates:
first_date = dates[0].text
creation_date = first_date
for date in dates:
if date.get("qualifier") == "creation":
creation_date = date.text
break
else:
creation_date = ''
return creation_date
def get_designers(self):
"""Return all designers for the record"""
designers = []
creators = self.elem[1][0].findall(UNTL_NAMESPACE + "creator")
for creator in creators:
if creator.get("qualifier") == "dsr":
designers.append(creator.findtext(UNTL_NAMESPACE + "name", "").strip())
contributors = self.elem[1][0].findall(UNTL_NAMESPACE + "contributor")
for contributor in contributors:
if contributor.get("qualifier") == "dsr":
designers.append(contributor.findtext(UNTL_NAMESPACE + "name", "").strip())
return designers
def get_donors(self):
"""Return all donors for the record"""
donors = []
creators = self.elem[1][0].findall(UNTL_NAMESPACE + "creator")
for creator in creators:
if creator.get("qualifier") == "dnr":
donors.append(creator.findtext(UNTL_NAMESPACE + "name", "").strip())
contributors = self.elem[1][0].findall(UNTL_NAMESPACE + "contributor")
for contributor in contributors:
if contributor.get("qualifier") == "dnr":
donors.append(contributor.findtext(UNTL_NAMESPACE + "name", "").strip())
return donors
def get_placenames(self):
"""Return all placeNames for the record"""
places = []
coverages = self.elem[1][0].findall(UNTL_NAMESPACE + "coverage")
for coverage in coverages:
if coverage.get("qualifier") == "placeName":
places.append(coverage.text)
return(places)
def get_type(self):
resource_type = ""
resource_types = self.elem[1][0].findall(UNTL_NAMESPACE + "resourceType")
resource_type = resource_types[0].text
return resource_type
def get_record_status(self):
"""Returns record status which is either active or deleted"""
return self.elem.find("header").get("status", "active")
def get_elements(self):
"""Yields designated element instances from record."""
elements = self.elem[1][0].findall(UNTL_NAMESPACE + self.options.element)
for element in elements:
if element is not None:
element_dict = {}
# Name fields have an additional nesting we need to deal with.
if self.options.element in NAME_FIELDS:
name = element.findtext(UNTL_NAMESPACE + "name", "").strip()
element_dict["value"] = name
else:
element_dict["value"] = element.text.strip()
element_dict["value"] = element_dict["value"].replace("\t", " ")
element_dict["value"] = element_dict["value"].replace("\n", " ")
element_dict["qualifier"] = element.get("qualifier", 'None')
# If "value" is empty we want to skip the element.
if not element_dict["value"]:
continue
# If we have asked for only a specific qualifier, yield only that.
if self.options.qualifier:
if self.options.qualifier == element_dict['qualifier']:
yield element_dict
# We didn't ask for a specific qualifier so yield all of them.
else:
yield element_dict
def get_all_data(self):
"""Returns a list of all metadata elements and values"""
for element in self.elem[1][0]:
text = ''
if element.tag.replace(UNTL_NAMESPACE, '') in NAME_FIELDS:
text = element.findtext(UNTL_NAMESPACE + "name", "").strip()
else:
text = element.text.strip()
if text:
value = text.replace("\t", " ")
value = value.replace("\n", " ")
qualifier = element.get("qualifier", None)
tag = element.tag
yield (tag, qualifier, value)
def has_element(self):
"""Returns True or False if a record has value in a selected metadata element"""
has_elements = self.elem[1][0].findall(UNTL_NAMESPACE + self.options.element)
for element in has_elements:
if element.text:
return True
return False
def main():
"""Main file handling and option handling"""
parser = argparse.ArgumentParser()
parser.add_argument("filename", type=str,
help="OAI-PMH UNTL Repository File")
args = parser.parse_args()
rows = []
untl_places = load_untl_places()
for _event, elem in ElementTree.iterparse(args.filename):
if elem.tag == "record":
record_dict = {}
record = Record(elem, args)
meta_id = record.get_meta_id()
record_dict['ark'] = meta_id
record_dict['thumbnail'] = f"https://digital.library.unt.edu/{meta_id}/small/"
record_dict['main_title'] = record.get_main_title()
record_dict['date'] = record.get_creation_date()
designers = record.get_designers()[:2]
if designers:
if len(designers) == 1:
record_dict['designer1'] = designers[0]
if len(designers) == 2:
record_dict['designer2'] = designers[1]
donors = record.get_donors()[:2]
if donors:
if len(donors) == 1:
record_dict['donor1'] = donors[0]
if len(donors) == 2:
record_dict['donor2'] = donors[1]
places = record.get_placenames()[:2]
if places:
if len(places) == 1:
record_dict['place1'] = places[0]
record_dict['place1-lat-long'] = untl_places.get(places[0], '')
if len(places) == 2:
record_dict['place2'] = places[1]
record_dict['place2-lat-long'] = untl_places.get(places[1], '')
record_dict['type'] = record.get_type()
rows.append(record_dict)
fieldnames = ['ark', 'thumbnail', 'main_title', 'date', 'designer1', 'designer2', 'donor1', 'donor2', 'place1', 'place1-lat-long', 'place2', 'place2-lat-long', 'type']
with open('my_file.csv', 'w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
for row in rows:
writer.writerow(row)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment