Created
September 3, 2017 01:19
-
-
Save tcurvelo/2693c2201b0cac8fcafbb552fc7ce55f to your computer and use it in GitHub Desktop.
Cli utility for spliting a big XML file into smaller parts.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Cli utility for spliting a big XML file into smaller parts. | |
The breakpoint will be the element of depth 1 (below root) that does not | |
fit in the specified maximum size. Each part created will have the same | |
root element. | |
Usage: | |
$ xmsplit size prefix [filename] | |
Eg: | |
$ xmlsplit 1000000 part_ mydata.xml | |
or | |
$ cat mydata.xml | xmlsplit 1000000 part_ | |
""" | |
from xml.sax import saxutils, handler, make_parser | |
import sys | |
class SplitterContentHandler(handler.ContentHandler): | |
def __init__(self, size, prefix): | |
handler.ContentHandler.__init__(self) | |
self._size = size | |
self._prefix = prefix | |
self._buffer(clean=True) | |
self._part = 1 | |
self._out = open('{}{}.xml'.format(self._prefix, self._part), 'w') | |
self._current_size = 0 | |
self._out_encoding = 'utf-8' | |
self._xml_declaration = ( | |
'<?xml version="1.0" encoding="UTF-8" standalone="yes"?>\n') | |
# root node will be at depth 0 | |
self._depth = -1 | |
self._root_tag = None | |
self._end_tag = None | |
def _buffer(self, text='', clean=False): | |
self.__buffer = '' if clean else self.__buffer + text | |
return self.__buffer | |
def _write(self, text=None): | |
if not text: | |
text = self._buffer() | |
self._current_size += len(text.encode(self._out_encoding)) | |
self._out.write(text) | |
def _fits_buffer(self): | |
total_required = ( | |
self._current_size + | |
len(self._buffer().encode(self._out_encoding)) + | |
len(self._end_tag.encode(self._out_encoding)) | |
) | |
return total_required < self._size | |
def _rotate(self): | |
self._write(self._end_tag) | |
self._out.close() | |
self._part += 1 | |
self._out = open('{}{}.xml'.format(self._prefix, self._part), 'w') | |
self._current_size = 0 | |
self.startDocument() | |
self._write(self._root_tag) | |
def startDocument(self): | |
self._write(self._xml_declaration) | |
def startElement(self, name, attrs): | |
self._depth += 1 | |
attributes = ''.join([ | |
' {}="{}"'.format(name, saxutils.escape(value)) | |
for (name, value) in attrs.items() | |
]) | |
tag = '<{}{}>'.format(name, attributes) | |
if self._depth == 0: # root | |
self._root_tag = tag | |
self._end_tag = '</{}>'.format(name) | |
self._buffer(tag) | |
def endElement(self, name): | |
self._buffer('</{}>'.format(name)) | |
if self._depth == 1: | |
if not self._fits_buffer(): | |
self._rotate() | |
self._write() | |
self._buffer(clean=True) | |
self._depth -= 1 | |
def endDocument(self): | |
self._write() | |
def characters(self, content): | |
self._buffer(saxutils.escape(content)) | |
def ignorableWhitespace(self, content): | |
self._buffer(content) | |
def processingInstruction(self, target, data): | |
self._buffer('<?{} {}?>'.format(target, data)) | |
def split(size, prefix, filename=None): | |
size = int(size) | |
input = filename if filename else sys.stdin | |
parser = make_parser() | |
parser.setContentHandler(SplitterContentHandler(size, prefix)) | |
parser.parse(input) | |
if __name__ == '__main__': | |
split(*sys.argv[1:]) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment