Created
May 15, 2018 19:17
-
-
Save peterbe/a11c1419c9e3b61235280ebd22febef8 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import csv | |
import datetime | |
import zlib | |
import asyncio | |
from glob import glob | |
import aiofiles | |
CHUNK_SIZE = 1024 * 256 # 256 KB | |
cutoff = datetime.datetime.now() - datetime.timedelta(days=6 * 30) | |
async def read_csv(input_generator): | |
""" | |
:param input_generator: async generator of raw bytes | |
""" | |
async for lines in split_lines(input_generator): | |
reader = csv.reader(lines) | |
for row in reader: | |
yield row | |
async def split_lines(stream): | |
"""Split the chunks of bytes on new lines. | |
""" | |
leftover = '' | |
async for chunk in stream: | |
chunk_str = chunk.decode('utf-8') | |
chunk_str = leftover + chunk_str | |
chunk_str = chunk_str.lstrip('\n') | |
lines = chunk_str.split('\n') | |
# Everything after \n belongs to the next line. | |
leftover = lines.pop() | |
if lines: | |
yield lines | |
async def stream(loop, fn, chunk_size=CHUNK_SIZE): | |
gzip = zlib.decompressobj(zlib.MAX_WBITS | 16) | |
async with aiofiles.open(fn, 'rb') as stream: | |
while 'there are chunks to read': | |
gzip_chunk = await stream.read(chunk_size) | |
if not gzip_chunk: | |
break | |
csv_chunk = gzip.decompress(gzip_chunk) | |
if csv_chunk: | |
yield csv_chunk | |
async def count(loop, fn, i): | |
count = total = 0 | |
print(i, fn) | |
async for line in read_csv(stream(loop, fn)): | |
lastmodified = datetime.datetime.strptime( | |
line[3], | |
'%Y-%m-%dT%H:%M:%S.%fZ' | |
) | |
if lastmodified > cutoff: | |
count += 1 | |
total += 1 | |
return total, count | |
async def run(loop): | |
total = recent = 0 | |
for i, fn in enumerate(glob('*.csv.gz')): | |
if len(fn) == 39: | |
print(i + 1, fn) | |
t, c = await count(loop, fn, i) | |
total += t | |
recent += c | |
print(total) | |
print(recent) | |
print('{:.1f}%'.format(100 * recent / total)) | |
loop = asyncio.get_event_loop() | |
try: | |
loop.run_until_complete(run(loop)) | |
finally: | |
loop.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Stable genius