Skip to content

Instantly share code, notes, and snippets.

@durdn
Created June 7, 2010 14:37
Show Gist options
  • Save durdn/428729 to your computer and use it in GitHub Desktop.
Save durdn/428729 to your computer and use it in GitHub Desktop.
parser.py
try:
from lxml import etree
except:
from xml.etree import ElementTree as etree
from datetime import datetime
class Record(object):
""" Stores data about an artifact in TMS """
def __init__(self, **kwds):
self.__dict__.update(kwds)
def __str__(self):
state = ["\t%s = %r" % (attribute, value) for (attribute, value) in self.__dict__.items()]
return '\n'.join(state)
class Author(object):
""" Stores data about an author in TMS """
def __init__(self, **kwds):
self.__dict__.update(kwds)
def __str__(self):
state = ["\t%s = %r" % (attribute, value) for (attribute, value) in self.__dict__.items()]
return '\n'.join(state)
class TMSParser(object):
""" Can interpret messages coming back from a TMS server conforming to the
Dublin Core standard as customized by Fabrique.
Check the tms/xsd folder to get a feel for the xml format
The parse method reads an xml response and creates a list of Record(s)
"""
def __init__(self,disable_validation=False):
if disable_validation:
self.disable_validation = True
else:
self.disable_validation = False
def validate(self,xml_file, schema='apps/tms/xsd/fabrique-dc.cached.xsd'):
if self.disable_validation:
return True
#parse the xsd
xmlschema_doc = etree.parse(file(schema))
xmlschema = etree.XMLSchema(xmlschema_doc)
#parse xml message
doc = etree.parse(xml_file)
#validate
return xmlschema.validate(doc)
def _xml_parse_and_validate(self,xml_file, schema='apps/tms/xsd/fabrique-dc.cached.xsd'):
#parse the xsd
xmlschema_doc = etree.parse(file(schema))
xmlschema = etree.XMLSchema(xmlschema_doc)
#parse xml message
doc = etree.parse(xml_file)
#return validation result and actual parsed response
return xmlschema.validate(doc),doc
def _xml_parse(self,xml_file, schema='apps/tms/xsd/fabrique-dc.cached.xsd'):
doc = etree.parse(xml_file)
return doc
@staticmethod
def _remove_ns_attribute(attribute):
newattr = {}
for k in attribute.keys():
curly_end = k.find('}')
if curly_end > 0:
ntag = k[curly_end+1:]
else:
ntag = k
newattr[ntag] = attribute[k]
return newattr
@staticmethod
def _remove_namespace(tag):
try:
curly_end = tag.find('}')
if curly_end > 0:
ntag = tag[curly_end+1:]
else:
ntag = tag
return ntag
except AttributeError:
return tag
def _handle_single_with_translation(self,tags,element,record):
#remove namespace
tag = self._remove_namespace(element.tag)
if tag in tags:
#this types have translations
try:
if record.__getattribute__(tag):
try:
record.__getattribute__(tag).update({self._remove_ns_attribute(element.attrib)['lang'] : unicode(element.text).strip()})
except KeyError:
#print 'stuff is broken',record,tag,element
pass
except AttributeError:
try:
record.__setattr__(tag,
{ self._remove_ns_attribute(element.attrib)['lang'] : unicode(element.text).strip()})
except KeyError:
#print 'stuff is broken',record,tag,element
pass
return record
def _handle_location(self,element,record):
tag = self._remove_namespace(element.tag)
if tag == 'location':
record.__setattr__(tag, unicode(element.text).strip())
record.__setattr__('location_type', self._remove_ns_attribute(element.attrib)['type'])
return record
def _handle_simple(self,tags,element,record):
tag = self._remove_namespace(element.tag)
if tag in tags:
#simplest of types
record.__setattr__(tag, unicode(element.text).strip())
return record
def _handle_link(self,element,record):
tag = self._remove_namespace(element.tag)
if tag == 'link':
if len(element) > 0:
for url in element:
curr_url = unicode(url.text).strip()
try:
if record.__getattribute__(element.tag):
record.__getattribute__(element.tag).append([curr_url])
except AttributeError:
record.__setattr__(element.tag,[])
record.__getattribute__(element.tag).append([curr_url])
curr_item = record.__getattribute__(element.tag)[-1]
for title in url:
if title.tag == 'title' and title.text:
ttl = title.text.strip()
try:
lang = self._remove_ns_attribute(title.attrib)['lang']
curr_item.append({})
curr_item[-1][lang] = ttl
except KeyError:
curr_item[-1]['nl'] = ttl
return record
def _handle_media(self,element,record):
tag = self._remove_namespace(element.tag)
if tag == 'media':
if element[0].tag == 'primary-image':
try:
url = unicode(element[0][0].text).strip()
record.__setattr__(element[0].tag.replace('-','_'), url)
except IndexError:
record.__setattr__(element[0].tag.replace('-','_'), '')
return record
for rest in element[1:]:
if len(rest) > 0:
for url in rest:
curr_url = unicode(url.text).strip()
try:
if record.__getattribute__(rest.tag):
record.__getattribute__(rest.tag).append([curr_url])
except AttributeError:
record.__setattr__(rest.tag,[])
record.__getattribute__(rest.tag).append([curr_url])
curr_item = record.__getattribute__(rest.tag)[-1]
for title in url:
if title.tag == 'title' and title.text:
ttl = title.text.strip()
try:
lang = self._remove_ns_attribute(title.attrib)['lang']
curr_item.append({})
curr_item[-1][lang] = ttl
except KeyError:
curr_item[-1]['nl'] = ttl
else:
#print 'no primary-image for this item: stopped parsing media element'
return record
return record
def _handle_date(self,tags,element,record):
tag = self._remove_namespace(element.tag)
if tag in tags:
#simplest of types
record.__setattr__(tag, datetime.strptime(element.text, "%Y-%m-%d"))
return record
def _handle_creator(self,element,record):
tag = self._remove_namespace(element.tag)
if tag == 'creator':
try:
record.__getattribute__(tag)
except AttributeError:
record.__setattr__(tag,[])
creator_list = record.__getattribute__(tag)
creator_list.append({'name': unicode(element.text).strip(),
'role': element.attrib['role'],
'lang': self._remove_ns_attribute(element.attrib)['lang']})
return record
def _handle_list(self,tags,element,record):
tag = self._remove_namespace(element.tag)
#creates a list
if tag in tags:
try:
if record.__getattribute__(tag):
record.__getattribute__(tag).append(unicode(element.text).strip())
except AttributeError:
record.__setattr__(tag,[unicode(element.text).strip()])
return record
def _handle_list_with_translation(self,tags,element,record):
tag = self._remove_namespace(element.tag)
#this types have translations
if tag in tags:
try:
if record.__getattribute__(tag):
record.__getattribute__(tag).setdefault(self._remove_ns_attribute(element.attrib)['lang'],[]).append(element.text)
except AttributeError:
record.__setattr__(tag,{})
record.__getattribute__(tag).setdefault(self._remove_ns_attribute(element.attrib)['lang'],[]).append(element.text)
return record
def _parse_record(self,xml_record):
record = Record()
nested_record = xml_record[1][0]
for element in nested_record:
#refactor this to have a big if switch that dispatches to these functions?
record = self._handle_media(element,record)
record = self._handle_link(element,record)
record = self._handle_single_with_translation(('title','description',
'temporal','background','objectname'),element,record)
record = self._handle_simple(('type', 'spatial','resources','credits',
'format','source','subject','departement','century',
'identifier'),element,record)
record = self._handle_date(('date', 'dateSubmitted'),element,record)
record = self._handle_list(('relation','dimensions'), element, record)
record = self._handle_creator(element, record)
record = self._handle_location(element, record)
record = self._handle_list_with_translation(('instructionalMethod','hasPart','coverage',
'bibliographicCitation','alternative'), element, record)
return record
def parse(self,xml_file):
""" Converts a xml response from TMS in a list of Record(s) """
result = []
xml = self._xml_parse(xml_file)
resultset = xml.getroot()[2]
request = xml.getroot()[1]
meta = [request[1].text,request[2].text,request[3].text]
for r in resultset:
record = self._parse_record(r)
result.append(record)
if len(result) > 0:
result[0].meta = meta
return result
class TMSAuthorParser(object):
""" Can interpret messages coming back from a TMS server describing
Authors and Creators
Check the tms/sample folder to get a feel for the xml format
The parse method reads an xml response and creates a list of TMSAuthor(s)
"""
def __init__(self,disable_validation=False):
if disable_validation:
self.disable_validation = True
else:
self.disable_validation = False
def validate(self,xml_file, schema='apps/tms/xsd/fabrique-dc.cached.xsd'):
if self.disable_validation:
return True
#parse the xsd
xmlschema_doc = etree.parse(file(schema))
xmlschema = etree.XMLSchema(xmlschema_doc)
#parse xml message
doc = etree.parse(xml_file)
#validate
return xmlschema.validate(doc)
def _xml_parse_and_validate(self,xml_file, schema='apps/tms/xsd/fabrique-dc.cached.xsd'):
#parse the xsd
xmlschema_doc = etree.parse(file(schema))
xmlschema = etree.XMLSchema(xmlschema_doc)
#parse xml message
doc = etree.parse(xml_file)
#return validation result and actual parsed response
return xmlschema.validate(doc),doc
def _xml_parse(self,xml_file, schema='apps/tms/xsd/fabrique-dc.cached.xsd'):
#parse xml message
doc = etree.parse(xml_file)
#return parsed response
return doc
@staticmethod
def _remove_ns_attribute(attribute):
newattr = {}
for k in attribute.keys():
curly_end = k.find('}')
if curly_end > 0:
ntag = k[curly_end+1:]
else:
ntag = k
newattr[ntag] = attribute[k]
return newattr
@staticmethod
def _remove_namespace(tag):
try:
curly_end = tag.find('}')
if curly_end > 0:
ntag = tag[curly_end+1:]
else:
ntag = tag
return ntag
except AttributeError:
return tag
def _handle_single_with_translation(self,tags,element,author):
#remove namespace
tag = self._remove_namespace(element.tag)
if tag in tags:
#this types have translations
try:
if author.__getattribute__(tag):
author.__getattribute__(tag).update(
{self._remove_ns_attribute(element.attrib)['lang'] : unicode(element.text).strip()})
except AttributeError:
author.__setattr__(tag,
{ self._remove_ns_attribute(element.attrib)['lang'] : unicode(element.text).strip()})
return author
def _handle_simple(self,tags,element,author):
tag = self._remove_namespace(element.tag)
if tag in tags:
#simplest of types
author.__setattr__(tag, unicode(element.text).strip())
return author
def _parse_author(self,xml_author):
author = Author()
nested_author = xml_author[1][0]
for element in nested_author:
author = self._handle_single_with_translation(('biografie',),element,author)
author = self._handle_simple(('ConstituentID','ccidentifier','DisplayDate',
'DisplayName','BeginDate','EndDate',
'FirstName','LastName'),element,author)
return author
def parse(self,xml_file):
""" Converts a xml response from TMS in a list of Author(s) """
result = []
xml = self._xml_parse(xml_file)
resultset = xml.getroot()[2]
for r in resultset:
author = self._parse_author(r)
result.append(author)
return result
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment