Skip to content

Instantly share code, notes, and snippets.

@aldomatic
Created December 7, 2018 16:23
Show Gist options
  • Save aldomatic/5cfef3b4fa41fecb78168493a16819e0 to your computer and use it in GitHub Desktop.
Save aldomatic/5cfef3b4fa41fecb78168493a16819e0 to your computer and use it in GitHub Desktop.
Export dynamodb data to csv, upload backup to S3 and delete items from table.
import csv
import boto3
from boto3.dynamodb.conditions import Key, Attr
from datetime import datetime
from pytz import timezone
import os
import json
# Here we assign our aws clients/resources to use
ses_client = boto3.client('ses')
s3_resource = boto3.resource('s3')
dynamodb_resource = boto3.resource('dynamodb')
# Here we reference our target table
table = dynamodb_resource.Table('client-lead-capture')
# Here we get some timestamp attributes
timestamps = get_timestamps()
# Here we name our csv and append the date
fileNameFormat = 'summer_2019_leads{}'.format(timestamps.get("date_format"))
csvFileName = '{}.csv'.format(fileNameFormat)
# Here we setup our dynamo table and its filter expressions
response = table.scan(
FilterExpression=Key("global_campaign").eq("summer-2019-campaign") & Attr("brand").eq("aws")
)
if len(response['Items']) != 0:
items = response['Items']
# Here we get the keys of the first object in items.
# We will use these keys for the headers/columns for our csv.
keys = items[0].keys()
for i in items:
with open(csvFileName, 'a') as f:
dict_writer = csv.DictWriter(f, keys)
# Here we check to see if its the first write.
if f.tell() == 0:
dict_writer.writeheader()
dict_writer.writerow(i)
else:
dict_writer.writerow(i)
# Here we save a backup copy of the csv file in S3
s3Object = s3_resource.Object('client-leads', 'client-lead-capture-backups/{}'.format(csvFileName))
s3Response = s3Object.put(Body=open(csvFileName, 'rb'))
if s3Response['ResponseMetadata']['HTTPStatusCode'] == 200:
# If the backup is saved succesfully we then delete those items from the dynamo table.
# Since we already have a backup in S3
delete_items_from_dynamodb(items)
return {
"status": True,
"file_name": str(csvFileName)
}
else:
return {
"status": False
}
else:
return {
"status": False,
"message": "No new leads found."
}
def get_timestamps():
central = timezone('America/Chicago')
central_date = datetime.now(central)
fmt = '%m-%d-%Y'
date_format = central_date.strftime(fmt)
time_format = central_date.strftime('%I:%M:%p')
return {
"date_format": str(date_format),
"time_format": str(time_format)
}
def delete_items_from_dynamodb(items):
for item in items:
# Here we use the partition key (Id) and sory key (global_canpaign) to delete the item
table.delete_item(
Key={
'id': item['id'],
'global_campaign': item['global_campaign']
}
)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment