Skip to content

Instantly share code, notes, and snippets.

@AlJohri
Created August 8, 2019 22:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save AlJohri/654e20d89618ce2576a5e74a3eefec20 to your computer and use it in GitHub Desktop.
Save AlJohri/654e20d89618ce2576a5e74a3eefec20 to your computer and use it in GitHub Desktop.
import io
import csv
import zlib
import boto3
from tqdm.auto import tqdm
s3 = boto3.resource('s3')
def iterable_to_stream(iterable, buffer_size=io.DEFAULT_BUFFER_SIZE):
"""
Lets you use an iterable (e.g. a generator) that yields bytestrings as a read-only
input stream.
The stream implements Python 3's newer I/O API (available in Python 2's io module).
For efficiency, the stream is buffered.
"""
class IterStream(io.RawIOBase):
def __init__(self):
self.leftover = None
def readable(self):
return True
def readinto(self, b):
try:
l = len(b) # We're supposed to return at most this much
chunk = self.leftover or next(iterable)
output, self.leftover = chunk[:l], chunk[l:]
b[:len(output)] = output
return len(output)
except StopIteration:
return 0 # indicate EOF
return io.BufferedReader(IterStream(), buffer_size=buffer_size)
def stream_s3_obj(obj):
if 'deflate' in obj.key:
d = zlib.decompressobj()
byte_stream = iterable_to_stream(d.decompress(chunk) for chunk in obj.get()['Body'].iter_chunks())
else:
byte_stream = iterable_to_stream(obj.get()['Body'].iter_chunks())
return io.TextIOWrapper(byte_stream, encoding='utf-8', newline='\n')
obj = s3.Object('wp-dw-datasets', 'wp_identity_newsletters/000000_0.deflate')
unique_newsletters = set()
for row in tqdm(csv.reader(stream_s3_obj(obj), delimiter='\t')):
name = row[2]
if name not in unique_newsletters:
unique_newsletters.add(name)
print(f"added {name}. {len(unique_newsletters)} unique newsletters found thus far")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment