Skip to content

Instantly share code, notes, and snippets.

@sacherus
Created July 29, 2019 08:07
Show Gist options
  • Save sacherus/dfaa945ef3dffe11b9bc724d479478e0 to your computer and use it in GitHub Desktop.
Save sacherus/dfaa945ef3dffe11b9bc724d479478e0 to your computer and use it in GitHub Desktop.
import datetime
import gzip
import logging
from boto3 import Session
from src.settings import AWS_BUCKET_NAME, AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY
class S3reader:
def read(self, key):
session = Session(
aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY)
logging.info('Processing file for key {}'.format(key))
s3_client = session.client("s3")
key = ReadOnce(s3_client.get_object(Bucket=AWS_BUCKET_NAME, Key=key)['Body'])
gz_file = gzip.GzipFile(fileobj=key, mode='rb')
return gz_file.read().decode('utf-8')
class S3List:
def list(self, days=30 * 7, end_date=datetime.datetime.today(), limit=None):
session = Session(
aws_access_key_id=AWS_ACCESS_KEY_ID, aws_secret_access_key=AWS_SECRET_ACCESS_KEY)
s3 = session.resource("s3")
bucket = s3.Bucket(AWS_BUCKET_NAME)
logging.info("Phase 1")
date_list = [end_date - datetime.timedelta(days=x) for x in range(days)]
prefixes = [date.strftime('bucket_name/%Y/%m/%d') for date in date_list]
logging.info("{} prefixes".format(prefixes))
i = 0
for prefix in prefixes:
for file in sorted(bucket.objects.filter(Prefix=prefix), key=lambda k: k.last_modified, reverse=True):
i += 1
yield file.key
if limit and i == limit:
return
class ReadOnce:
def __init__(self, k):
self.key = k
self.has_read_once = False
def read(self, size=0):
if self.has_read_once:
return b''
data = self.key.read(size)
if not data:
self.has_read_once = True
return data
def process_s3(p):
"""
this pseudocode
"""
p = beam.Pipeline(options=pipeline_options)
#todo: some problems with main sessions
from src.aws.process_key import S3reader
xmls = p | beam.Create(S3List().list(days=2)) | beam.Map(S3reader().read)
models = (xmls | beam.FlatMap(process_xmls))
return models
#This code fulfill my specific need. If someone finds it's useful I'm happy to refactor it so it has more general usage. Tested on 4TB of data plus dataflow.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment