Skip to content

Instantly share code, notes, and snippets.

@mnot
Created December 27, 2009 10:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mnot/264238 to your computer and use it in GitHub Desktop.
Save mnot/264238 to your computer and use it in GitHub Desktop.
xop_parser.py - A demonstration XOP Parser for SAX
#!/usr/bin/env python
"""
xop_parser.py - A demonstration XOP Parser for SAX
Based upon: http://www.w3.org/2000/xp/Group/3/06/Attachments/XOP.html
THIS SOFTWARE IS EXPERIMENTAL - INTERFACES MAY CHANGE AT ANY TIME.
CAVEATS / TODO:
- requires python 2.3 AFAIK
- very little error handling
- only takes a file object in parse()
- REQUIRES namespaces on ContentHandler
- only supports MIME multipart/related packaging
- only Expat parser supported underneath
- need to check canonicalisation of base64
- potential for unneccessary decode/encode if Content-Transfer-Encoding is base64
- compares content-locations lexically (is this OK?)
- assumes that all media types ending in XOP_MT_SUFFIX are XOP Documents
"""
__license__ = """
Copyright (c) 2004 Mark Nottingham <mnot@pobox.com>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
Usage:
Use just as you would a xml.sax parser. ContentHandlers can have one extra
method, binary(), which is called when binary content is available. If binary()
is not defined, characters() will be, with the appropriate (encoded) content.
Note that even when you define binary(), you should still allow for such content
to be handled by characters(), in case it isn't binary in the XOP Package.
"""
__version__ = "0.22"
import email, base64
from xml.sax import expatreader, xmlreader, saxutils
from StringIO import StringIO
XOP_NS = 'http://www.w3.org/2003/12/xop/include'
XOP_MT_SUFFIX = '_xop+xml'
class XopParser(expatreader.ExpatParser):
def parse(self, source):
msg = email.message_from_file(source)
assert msg.is_multipart()
assert msg.get_param('type')[-len(XOP_MT_SUFFIX):] == XOP_MT_SUFFIX
self._parts = msg.get_payload()
self._partsByID = dict([[i.get('Content-ID', None), i] for i in self._parts])
self._partsByCL = dict([[i.get('Content-Location', None), i] for i in self._parts])
try:
rootID = msg.get_param('start')
assert rootID is not None
rootPart = self._partsByID[rootID]
except:
rootPart = self._parts[0]
assert rootPart.get_content_type()[-len(XOP_MT_SUFFIX):] == XOP_MT_SUFFIX
self._source = StringIO(rootPart.get_payload(decode=1))
self.reset()
self._cont_handler.setDocumentLocator(expatreader.ExpatLocator(self))
return xmlreader.IncrementalParser.parse(self, self._source)
def setContentHandler(self, handler):
return expatreader.ExpatParser.setContentHandler(
self, XopHandlerWrapper(handler, self))
class XopHandlerWrapper:
def __init__(self, handler, parser):
self._handler = handler
self._parser = parser
def __getattr__(self, attr): # bit of a hack; maybe a metaclass instead?
return getattr(self._handler, attr)
def startElementNS(self, name, qname, attrs):
if name == (XOP_NS, 'Include'):
if hasattr(self._handler, 'binary'):
self._handler.binary(self._include(attrs[(None, 'href')]))
else:
self._handler.characters(base64.encodestring(self._include(attrs[(None, 'href')]))[:-1])
else:
self._handler.startElementNS(name, qname, attrs)
def endElementNS(self, name, qname):
if name == (XOP_NS, 'Include'):
pass
else:
self._handler.endElementNS(name, qname)
def _include(self, href):
if href.lower()[:4] == 'cid:':
return self._parser._partsByID["<%s>" % href[4:]].get_payload(decode=1)
else:
return self._parser._partsByCL[href].get_payload(decode=1)
def create_parser(*args, **kwargs):
return XopParser(*args, **kwargs)
def test(doc):
"""Given a XOP Package, emit an XML 1.0 serialization of its Optimized Infoset"""
from xml import sax
from xml.sax import saxutils
dh = saxutils.XMLGenerator()
p = XopParser()
p.setContentHandler(dh)
p.setFeature(sax.handler.feature_namespaces, 1)
p.parse(doc)
if __name__ == '__main__':
doc = """\
MIME-Version: 1.0
Content-Type: Multipart/Related;boundary=MIME_boundary;
type=application/soap_xop+xml;start="<mymessage.xml@example.org>"
Content-Description: An XML document with my picture and signature in it
--MIME_boundary
Content-Type: application/soap_xop+xml; charset=UTF-8
Content-Transfer-Encoding: 8bit
Content-ID: <mymessage.xml@example.org>
<soap:Envelope
xmlns:soap='http://www.w3.org/2003/05/soap-envelope'
xmlns:xop='http://www.w3.org/2003/12/xop/include'
xmlns:xop-mime='http://www.w3.org/2003/12/xop/mime'>
<soap:Body>
<m:data xmlns:m='http://example.org/stuff'>
<m:photo xop-mime:content-type='image/png'>
<xop:Include href='http://example.org/me.png'/>
</m:photo>
<m:sig xop-mime:content-type='application/pkcs7-signature'>
<xop:Include href='cid:http://example.org/my.hsh'/>
</m:sig>
</m:data>
</soap:Body>
</soap:Envelope>
--MIME_boundary
Content-Type: image/png
Content-Transfer-Encoding: binary
Content-Location: http://example.org/me.png
// binary octets for png
--MIME_boundary
Content-Type: application/pkcs7-signature
Content-Transfer-Encoding: binary
Content-ID: <http://example.org/my.hsh>
// binary octets for signature
--MIME_boundary--
"""
# import profile
# profile.run('test(StringIO(doc))')
import sys
if sys.argv[-1] == '-t':
test(StringIO(doc))
else:
test(sys.stdin)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment