Skip to content

Instantly share code, notes, and snippets.

@devilholk
Last active June 9, 2018 13:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save devilholk/ee9797eb4f784ae4e75cd72bf39a9956 to your computer and use it in GitHub Desktop.
Save devilholk/ee9797eb4f784ae4e75cd72bf39a9956 to your computer and use it in GitHub Desktop.
Incremental processing of bz2-compressed XML files
import bz2, time, functools, xml.etree.ElementTree
#serial reading of XML inspired by http://boscoh.com/programming/reading-xml-serially.html
#note that you can do elem.clear for every element
def format_amount(amount, precision=2, unit='B', base=1024, unit_prefix='i'):
if amount < base ** 1:
return f'{amount}{unit}'
elif amount < base ** 2:
return f'{round(amount/(base**1), precision)} K{unit_prefix}{unit}'
elif amount < base ** 3:
return f'{round(amount/(base**2), precision)} M{unit_prefix}{unit}'
elif amount < base ** 4:
return f'{round(amount/(base**3), precision)} G{unit_prefix}{unit}'
elif amount < base ** 5:
return f'{round(amount/(base**4), precision)} T{unit_prefix}{unit}'
else:
return f'{round(amount/(base**5), precision)} P{unit_prefix}{unit}'
class timed_data_stats:
def __init__(self, name='Statistics', temporal_auto_report_granuality=0.5, auto_report_format='{name} {total} @ {speed}.', auto_report=True, format_data_transfer_amount=format_amount, format_data_transfer_rate=functools.partial(format_amount, unit='B/s')):
self.last_checked = None
self.temporal_auto_report_granuality = temporal_auto_report_granuality
self.auto_report_format = auto_report_format
self.auto_report = auto_report
self.count = 0
self.name = name
self.last_count = 0
self.format_data_transfer_amount = format_data_transfer_amount
self.format_data_transfer_rate = format_data_transfer_rate
def __iadd__(self, other):
now = time.time()
self.count += other
if self.last_checked == None:
self.last_checked = now
elif self.auto_report:
delta_time = now - self.last_checked
delta_data = self.count - self.last_count
if delta_time > self.temporal_auto_report_granuality:
if delta_data:
print(self.auto_report_format.format(name=self.name, total=self.format_data_transfer_amount(self.count), speed=self.format_data_transfer_rate(delta_data / delta_time)))
else:
print(self.auto_report_format.format(name=self.name, total='N/A', speed='N/A'))
self.last_checked = now
self.last_count = self.count
return self
class incremental_bz2_decoder:
BUFSIZE = 1024*1024
def __init__(self, filename, temporal_auto_report_granuality=0.5):
self.filename = filename
self.inflater = bz2.BZ2Decompressor()
self.inflated_stats = timed_data_stats('Inflated processed', temporal_auto_report_granuality=temporal_auto_report_granuality)
self.deflated_stats = timed_data_stats('Deflated processed', temporal_auto_report_granuality=temporal_auto_report_granuality)
self.local_buffer = b''
self.reader = self._iterative_reader()
def _iterative_reader(self):
with open(self.filename, 'br') as infile:
while True:
buf = infile.read(self.BUFSIZE)
chunk = self.inflater.decompress(buf)
self.deflated_stats += len(buf)
self.inflated_stats += len(chunk)
if chunk:
yield chunk
def read(self, amount):
while amount > len(self.local_buffer):
self.local_buffer += self.reader.__next__()
if amount == len(self.local_buffer):
result = self.local_buffer
self.local_buffer = b''
else: #amount is < local buffer
result = self.local_buffer[:amount]
self.local_buffer = self.local_buffer[amount:]
return result
xml_stream = incremental_bz2_decoder('enwiktionary-20180601-pages-meta-current.xml.bz2', temporal_auto_report_granuality=2)
event_count = timed_data_stats('Processed events', format_data_transfer_amount=functools.partial(format_amount, base=1000, unit_prefix='', unit=''), format_data_transfer_rate=functools.partial(format_amount, base=1000, unit_prefix='/s', unit=''))
for event, elem in xml.etree.ElementTree.iterparse(xml_stream , events=('start', 'end', 'start-ns', 'end-ns')):
#print (event, elem)
event_count += 1
if event == 'end':
elem.clear()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment