Skip to content

Instantly share code, notes, and snippets.

@ervinne13
Last active April 16, 2021 14:03
Show Gist options
  • Save ervinne13/76ad8d1cd5c1ceb551880ded5b209180 to your computer and use it in GitHub Desktop.
Save ervinne13/76ad8d1cd5c1ceb551880ded5b209180 to your computer and use it in GitHub Desktop.
Sample Lambda Function for Processing CSVs from an S3 Bucket
from os import environ
import boto3
from mongodb import initialize_connection
from pandas import read_csv
from operations_writer import OperationsWriter
from worklogs_operations_generator import WorklogsOperationsGenerator
from worklogs_repository import WorklogsRepository
def lambda_handler(event, context):
s3 = boto3.client('s3')
assert_event_has_record(event)
record = event['Records'][0]
df = get_csv_contents_as_data_frame(record, s3)
with initialize_connection() as client, client.start_session() as session:
with session.start_transaction():
db = client.worklogs
op_wrtr = OperationsWriter(db, session)
repo = WorklogsRepository(db, session)
gnrtor = WorklogsOperationsGenerator(repo)
ops = gnrtor.generate_from_data_frame(df)
op_wrtr.write_batch(ops)
return {
'statusCode': 200,
'body': 'Success'
}
def assert_event_has_record(event):
if (len(event['Records']) <= 0):
raise Exception("event must have a record about an s3 object.")
def get_csv_contents_as_data_frame(record, s3):
bucket_name = record['s3']['bucket']['name']
file_name = record['s3']['object']['key']
return read_csv(f's3://{bucket_name}/{file_name}', sep=',')
from os import environ
from pymongo import MongoClient
def initialize_connection():
conn_string = environ['mongodb_conn_string']
client = MongoClient(conn_string)
return client
from datetime import datetime
class OperationsWriter:
def __init__(self, mongo_db, mongo_sess):
self.mongo_db = mongo_db
self.mongo_sess = mongo_sess
def write_batch(self, batch):
collection = self.mongo_db.operations
collection.insert_one({
**batch,
'executed_at': datetime.now()
}, session=self.mongo_sess)
class WorklogsOperationsGenerator:
def __init__(self, worklogs_repository):
self.worklogs_repository = worklogs_repository
def generate_from_data_frame(self, df):
id_list = df['id'].tolist()
id_set = set(id_list)
split_ids = self.worklogs_repository.split_id_set_by_new(id_set)
create_ops = []
update_ops = []
for index, wrklog_row in df.iterrows():
op = self.create_operation(split_ids, wrklog_row)
if (op['action'] == 'update'):
update_ops.append(op)
elif (op['action'] == 'create'):
create_ops.append(op)
return {
'updates': update_ops,
'creates': create_ops
}
def create_operation(self, split_ids, wrklog_row):
action = self.get_action_or_fail(split_ids, wrklog_row)
body = {
'id': wrklog_row['id'],
'date': wrklog_row['date'],
'project': wrklog_row['project'],
'type': wrklog_row['type'],
'mins_spent': wrklog_row['minutesspent'],
'employee': {
'id': wrklog_row['employee_id'],
'name': wrklog_row['display_name']
}
}
return {
'resource': 'worklog',
'worklog_id': wrklog_row['id'],
'action': action,
'body': body
}
def get_action_or_fail(self, split_ids, worklog):
id = worklog['id']
if (id in split_ids['existing']):
return 'update'
if (id in split_ids['new']):
return 'create'
raise Exception(f'There was an issue in the split_ids when processing worklog {id}')
# TODO: Check out inversion of control in python and decouple mongodb or at least
# make this as an interface implementation
class WorklogsRepository:
def __init__(self, mongo_db, mongo_sess):
self.mongo_db = mongo_db
self.mongo_sess = mongo_sess
# Warning! for testing purposes only!
def print_all_worklogs(self):
for w in self.mongo_db.worklogs.find({}, session=self.mongo_sess):
print(w)
def split_id_set_by_new(self, id_set):
already_saved_ids = set()
filter_new_ids = { 'id': { '$in': list(id_set) } }
select_column_id = { 'id': 1 }
collection = self.mongo_db.worklogs
for w in collection.find(filter_new_ids, select_column_id, session=self.mongo_sess):
already_saved_ids.add(w['id'])
return {
'existing': already_saved_ids,
'new': id_set - already_saved_ids
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment