Skip to content

Instantly share code, notes, and snippets.

@peterbe peterbe/
Created May 15, 2018

What would you like to do?
import csv
import datetime
import zlib
import asyncio
from glob import glob
import aiofiles
CHUNK_SIZE = 1024 * 256 # 256 KB
cutoff = - datetime.timedelta(days=6 * 30)
async def read_csv(input_generator):
:param input_generator: async generator of raw bytes
async for lines in split_lines(input_generator):
reader = csv.reader(lines)
for row in reader:
yield row
async def split_lines(stream):
"""Split the chunks of bytes on new lines.
leftover = ''
async for chunk in stream:
chunk_str = chunk.decode('utf-8')
chunk_str = leftover + chunk_str
chunk_str = chunk_str.lstrip('\n')
lines = chunk_str.split('\n')
# Everything after \n belongs to the next line.
leftover = lines.pop()
if lines:
yield lines
async def stream(loop, fn, chunk_size=CHUNK_SIZE):
gzip = zlib.decompressobj(zlib.MAX_WBITS | 16)
async with, 'rb') as stream:
while 'there are chunks to read':
gzip_chunk = await
if not gzip_chunk:
csv_chunk = gzip.decompress(gzip_chunk)
if csv_chunk:
yield csv_chunk
async def count(loop, fn, i):
count = total = 0
print(i, fn)
async for line in read_csv(stream(loop, fn)):
lastmodified = datetime.datetime.strptime(
if lastmodified > cutoff:
count += 1
total += 1
return total, count
async def run(loop):
total = recent = 0
for i, fn in enumerate(glob('*.csv.gz')):
if len(fn) == 39:
print(i + 1, fn)
t, c = await count(loop, fn, i)
total += t
recent += c
print('{:.1f}%'.format(100 * recent / total))
loop = asyncio.get_event_loop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.