Skip to content

Instantly share code, notes, and snippets.

@jermnelson
Last active August 29, 2015 14:18
Show Gist options
  • Save jermnelson/2129abdde3da61506c44 to your computer and use it in GitHub Desktop.
Save jermnelson/2129abdde3da61506c44 to your computer and use it in GitHub Desktop.
config.yaml
*.db
*.mrc

About

Gist a quick script that iterates through Tutt Library's MARC21 records and queries ILS for total number of checkouts, if checkouts > 10, saves record to popular-record.mrc. Saves result sqlite database

__author__ = "Jeremy Nelson"
__license__ = "GPLv3"
import argparse
import datetime
import os
import pymarc
import requests
import sqlalchemy
import sqlalchemy.ext.declarative
import sys
import yaml
import xml.etree.ElementTree as etree
Base = sqlalchemy.ext.declarative.declarative_base()
class Record(Base):
__tablename__ = 'records'
id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True)
bib_number = sqlalchemy.Column(sqlalchemy.String(20))
inventory_date = sqlalchemy.Column(sqlalchemy.Date)
item_number = sqlalchemy.Column(sqlalchemy.String(20))
last_updated = sqlalchemy.Column(sqlalchemy.Date)
def __repr__(self):
return "<Record(bib='{}', item={}).".format(
self.bib_number,
self.item_number)
class Usage(Base):
__tablename__ = 'record_usage'
id = sqlalchemy.Column(sqlalchemy.Integer, primary_key=True)
created_on = sqlalchemy.Column(
sqlalchemy.DateTime,
default=datetime.datetime.utcnow)
last_checkin = sqlalchemy.Column(sqlalchemy.Date)
record_id = sqlalchemy.Column(
sqlalchemy.Integer,
sqlalchemy.ForeignKey('records.id'))
record = sqlalchemy.orm.relationship("Record",
backref=sqlalchemy.orm.backref('records', order_by=id))
usage = sqlalchemy.Column(sqlalchemy.Integer)
def __repr__(self):
return "<Usage(record bib={}, usage={}, last_checkin={})>".format(
self.record.bib_number,
self.usage,
self.last_checkin)
with open(
os.path.join(
os.path.dirname(
os.path.abspath(__file__)),
"config.yaml")) as config_file:
CONFIG = yaml.load(config_file)
def get_info(record):
field945s = record.get_fields('945')
info = {}
for field in field945s:
if not 'y' in field:
continue
item_id = field['y'][1:-1]
info = retrieve_info(item_id)
info['item_number'] = item_id
return info
def get_date(date_string):
date_string = date_string.strip()
if len(date_string) == 8:
pattern = '%m-%d-%y'
elif len(date_string) == 10:
pattern = '%m-%d-%Y'
else:
return None
date = datetime.datetime.strptime(
date_string,
pattern)
return date
def retrieve_info(item_id):
tiger_url = "{}={}".format(CONFIG.get('iii_url'), item_id)
result = requests.get(tiger_url)
info = {'item':{}, 'usage':{}}
if result.status_code < 400:
item_xml = etree.XML(result.text)
total_checkouts = item_xml.find(
"TYPEINFO/ITEM/FIXFLD[FIXLABEL='TOT CHKOUT']/FIXVALUE")
if total_checkouts is not None:
info['usage']['usage'] = int(total_checkouts.text)
last_checkin = item_xml.find(
"TYPEINFO/ITEM/FIXFLD[FIXLABEL='LCHKIN']/FIXVALUE")
if last_checkin is not None:
info['usage']['last_checkin'] = get_date(last_checkin.text)
inventory_date = item_xml.find(
"TYPEINFO/ITEM/FIXFLD[FIXLABEL='INVDA']/FIXVALUE")
if inventory_date is not None:
info['item']['inventory_date'] = get_date(inventory_date.text)
last_updated = item_xml.find("RECORDINFO/LASTUPDATEDATE")
if last_updated is not None:
info['item']['last_updated'] = get_date(last_updated.text)
else:
info['error'] = "{}\n\n{}".format(result.status_code, result.text)
return info
def add_popular(marc):
popular.append(marc)
def add_record(**kwargs):
if session.query(Record).filter_by(
bib_number=kwargs.get('bib_number')).count() > 0:
return session.query(Record).filter_by(
kwargs.get('bib_number')).first()
record = Record(**kwargs)
session.add(record)
session.commit()
return record
def add_usage(**kwargs):
record = kwargs.get('record')
if session.query(Usage).filter_by(
record=record).filter_by(
last_checkin=kwargs.get('last_checkin')).count() < 1:
new_usage = Usage(**kwargs)
session.add(new_usage)
session.commit()
def process_marc(marc):
bib_number = marc['907']['a'][1:-1]
record = session.query(Record).filter_by(bib_number=bib_number).first()
info = get_info(marc)
if record is None:
rec_info = info.get('item', {})
rec_info['bib_number'] = bib_number
record = add_record(**rec_info)
if not 'usage' in info:
info['usage'] = {}
if info['usage'].get('usage', 0) > 0:
usage = info['usage']
usage['record'] = record
add_usage(**usage)
if info['usage'].get('usage', 0) > 9:
add_popular(marc)
def process_shard(file_path):
marc_reader = pymarc.MARCReader(open(file_path, 'rb'), to_unicode=True)
i = 0
start = datetime.datetime.utcnow()
while 1:
try:
process_marc(next(marc_reader))
except UnicodeDecodeError as e:
sys.stdout.write("UnicodeDecodeError with record {} in {}".format(
i, file_path))
except StopIteration:
break
if not i%10 and i > 0:
sys.stdout.write(".")
if not i%100:
sys.stdout.write(str(i))
if not i%1000 and i > 0:
now = datetime.datetime.utcnow()
sys.stdout.write("{} total secs={}".format(now.isoformat(), (now-start).seconds))
i += 1
try:
save_popular()
except:
print("Failed to save popular {}".format(len(popular)))
end = datetime.datetime.utcnow()
sys.stdout.write("\nFinished processing {} at {}, total time {} minutes for {}".format(
file_path,
end.isoformat(),
(end-start).seconds / 60.0,
i))
def save_popular():
tutt_mrc_path = os.path.join(
os.path.dirname(
os.path.abspath(__file__)),
"tutt-popular.mrc")
with open(tutt_mrc_path) as tutt_popular:
popular_records = [r for r in tutt_popular]
popular_records.extend(popular)
with open(tutt_mrc_path, "wb+") as tutt_popular:
writer = pymarc.MARCWriter(tutt_popular)
for rec in popular_records:
if type(rec) == pymarc.Record:
try:
writer.write(rec)
except UnicodeEncodeError as error:
pass
writer.close()
def setup():
global engine, session, popular
popular = []
engine = sqlalchemy.create_engine("sqlite:///circ_usage.db")
Base.metadata.create_all(engine)
Session = sqlalchemy.orm.sessionmaker()
Session.configure(bind=engine)
session = Session()
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='TIGER Usage Harvester')
parser.add_argument('marc_filepath', help='MARC21 file path')
args = parser.parse_args()
setup()
process_shard(args.marc_filepath)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment