Last active
June 9, 2018 13:48
-
-
Save devilholk/ee9797eb4f784ae4e75cd72bf39a9956 to your computer and use it in GitHub Desktop.
Incremental processing of bz2-compressed XML files
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import bz2, time, functools, xml.etree.ElementTree | |
#serial reading of XML inspired by http://boscoh.com/programming/reading-xml-serially.html | |
#note that you can do elem.clear for every element | |
def format_amount(amount, precision=2, unit='B', base=1024, unit_prefix='i'): | |
if amount < base ** 1: | |
return f'{amount}{unit}' | |
elif amount < base ** 2: | |
return f'{round(amount/(base**1), precision)} K{unit_prefix}{unit}' | |
elif amount < base ** 3: | |
return f'{round(amount/(base**2), precision)} M{unit_prefix}{unit}' | |
elif amount < base ** 4: | |
return f'{round(amount/(base**3), precision)} G{unit_prefix}{unit}' | |
elif amount < base ** 5: | |
return f'{round(amount/(base**4), precision)} T{unit_prefix}{unit}' | |
else: | |
return f'{round(amount/(base**5), precision)} P{unit_prefix}{unit}' | |
class timed_data_stats: | |
def __init__(self, name='Statistics', temporal_auto_report_granuality=0.5, auto_report_format='{name} {total} @ {speed}.', auto_report=True, format_data_transfer_amount=format_amount, format_data_transfer_rate=functools.partial(format_amount, unit='B/s')): | |
self.last_checked = None | |
self.temporal_auto_report_granuality = temporal_auto_report_granuality | |
self.auto_report_format = auto_report_format | |
self.auto_report = auto_report | |
self.count = 0 | |
self.name = name | |
self.last_count = 0 | |
self.format_data_transfer_amount = format_data_transfer_amount | |
self.format_data_transfer_rate = format_data_transfer_rate | |
def __iadd__(self, other): | |
now = time.time() | |
self.count += other | |
if self.last_checked == None: | |
self.last_checked = now | |
elif self.auto_report: | |
delta_time = now - self.last_checked | |
delta_data = self.count - self.last_count | |
if delta_time > self.temporal_auto_report_granuality: | |
if delta_data: | |
print(self.auto_report_format.format(name=self.name, total=self.format_data_transfer_amount(self.count), speed=self.format_data_transfer_rate(delta_data / delta_time))) | |
else: | |
print(self.auto_report_format.format(name=self.name, total='N/A', speed='N/A')) | |
self.last_checked = now | |
self.last_count = self.count | |
return self | |
class incremental_bz2_decoder: | |
BUFSIZE = 1024*1024 | |
def __init__(self, filename, temporal_auto_report_granuality=0.5): | |
self.filename = filename | |
self.inflater = bz2.BZ2Decompressor() | |
self.inflated_stats = timed_data_stats('Inflated processed', temporal_auto_report_granuality=temporal_auto_report_granuality) | |
self.deflated_stats = timed_data_stats('Deflated processed', temporal_auto_report_granuality=temporal_auto_report_granuality) | |
self.local_buffer = b'' | |
self.reader = self._iterative_reader() | |
def _iterative_reader(self): | |
with open(self.filename, 'br') as infile: | |
while True: | |
buf = infile.read(self.BUFSIZE) | |
chunk = self.inflater.decompress(buf) | |
self.deflated_stats += len(buf) | |
self.inflated_stats += len(chunk) | |
if chunk: | |
yield chunk | |
def read(self, amount): | |
while amount > len(self.local_buffer): | |
self.local_buffer += self.reader.__next__() | |
if amount == len(self.local_buffer): | |
result = self.local_buffer | |
self.local_buffer = b'' | |
else: #amount is < local buffer | |
result = self.local_buffer[:amount] | |
self.local_buffer = self.local_buffer[amount:] | |
return result | |
xml_stream = incremental_bz2_decoder('enwiktionary-20180601-pages-meta-current.xml.bz2', temporal_auto_report_granuality=2) | |
event_count = timed_data_stats('Processed events', format_data_transfer_amount=functools.partial(format_amount, base=1000, unit_prefix='', unit=''), format_data_transfer_rate=functools.partial(format_amount, base=1000, unit_prefix='/s', unit='')) | |
for event, elem in xml.etree.ElementTree.iterparse(xml_stream , events=('start', 'end', 'start-ns', 'end-ns')): | |
#print (event, elem) | |
event_count += 1 | |
if event == 'end': | |
elem.clear() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment