Skip to content

Instantly share code, notes, and snippets.

@tcurvelo
Created September 3, 2017 01:19
Show Gist options
  • Save tcurvelo/2693c2201b0cac8fcafbb552fc7ce55f to your computer and use it in GitHub Desktop.
Save tcurvelo/2693c2201b0cac8fcafbb552fc7ce55f to your computer and use it in GitHub Desktop.
Cli utility for spliting a big XML file into smaller parts.
#!/usr/bin/env python3
"""
Cli utility for spliting a big XML file into smaller parts.
The breakpoint will be the element of depth 1 (below root) that does not
fit in the specified maximum size. Each part created will have the same
root element.
Usage:
$ xmsplit size prefix [filename]
Eg:
$ xmlsplit 1000000 part_ mydata.xml
or
$ cat mydata.xml | xmlsplit 1000000 part_
"""
from xml.sax import saxutils, handler, make_parser
import sys
class SplitterContentHandler(handler.ContentHandler):
def __init__(self, size, prefix):
handler.ContentHandler.__init__(self)
self._size = size
self._prefix = prefix
self._buffer(clean=True)
self._part = 1
self._out = open('{}{}.xml'.format(self._prefix, self._part), 'w')
self._current_size = 0
self._out_encoding = 'utf-8'
self._xml_declaration = (
'<?xml version="1.0" encoding="UTF-8" standalone="yes"?>\n')
# root node will be at depth 0
self._depth = -1
self._root_tag = None
self._end_tag = None
def _buffer(self, text='', clean=False):
self.__buffer = '' if clean else self.__buffer + text
return self.__buffer
def _write(self, text=None):
if not text:
text = self._buffer()
self._current_size += len(text.encode(self._out_encoding))
self._out.write(text)
def _fits_buffer(self):
total_required = (
self._current_size +
len(self._buffer().encode(self._out_encoding)) +
len(self._end_tag.encode(self._out_encoding))
)
return total_required < self._size
def _rotate(self):
self._write(self._end_tag)
self._out.close()
self._part += 1
self._out = open('{}{}.xml'.format(self._prefix, self._part), 'w')
self._current_size = 0
self.startDocument()
self._write(self._root_tag)
def startDocument(self):
self._write(self._xml_declaration)
def startElement(self, name, attrs):
self._depth += 1
attributes = ''.join([
' {}="{}"'.format(name, saxutils.escape(value))
for (name, value) in attrs.items()
])
tag = '<{}{}>'.format(name, attributes)
if self._depth == 0: # root
self._root_tag = tag
self._end_tag = '</{}>'.format(name)
self._buffer(tag)
def endElement(self, name):
self._buffer('</{}>'.format(name))
if self._depth == 1:
if not self._fits_buffer():
self._rotate()
self._write()
self._buffer(clean=True)
self._depth -= 1
def endDocument(self):
self._write()
def characters(self, content):
self._buffer(saxutils.escape(content))
def ignorableWhitespace(self, content):
self._buffer(content)
def processingInstruction(self, target, data):
self._buffer('<?{} {}?>'.format(target, data))
def split(size, prefix, filename=None):
size = int(size)
input = filename if filename else sys.stdin
parser = make_parser()
parser.setContentHandler(SplitterContentHandler(size, prefix))
parser.parse(input)
if __name__ == '__main__':
split(*sys.argv[1:])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment