Skip to content

Instantly share code, notes, and snippets.

@recalde
Last active February 28, 2024 23:30
Show Gist options
  • Save recalde/5253890b491c15a102263a697ec83aaa to your computer and use it in GitHub Desktop.
Save recalde/5253890b491c15a102263a697ec83aaa to your computer and use it in GitHub Desktop.
Account name cache
import boto3
import csv
from datetime import datetime, timedelta
from io import StringIO
def scan_dynamodb_table(dynamodb_table_name, last_execution_time=None):
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(dynamodb_table_name)
# Calculate the start and end time for the time range (1 month ago from last execution time or now)
end_time = datetime.utcnow()
start_time = last_execution_time if last_execution_time else end_time - timedelta(days=30)
# Scan DynamoDB table for entries within the specified time range
response = table.scan(
FilterExpression='#timestamp between :start_time and :end_time',
ExpressionAttributeNames={'#timestamp': 'timestamp'},
ExpressionAttributeValues={':start_time': start_time.isoformat(), ':end_time': end_time.isoformat()}
)
return {item['account_name'][0].lower(): set() for item in response['Items']}
def merge_and_upload_to_s3(new_data, s3_bucket, last_execution_time):
s3 = boto3.client('s3')
for first_letter in new_data.keys():
# Download existing data from S3
s3_key = f'distinct_account_names/{first_letter}'
try:
obj = s3.get_object(Bucket=s3_bucket, Key=s3_key)
existing_data = set(csv.reader(obj['Body'].read().decode('utf-8').splitlines()))
except s3.exceptions.NoSuchKey:
existing_data = set()
# Merge new data with existing data and sort alphabetically
merged_data = sorted(existing_data.union(new_data[first_letter]))
# Upload merged data back to S3
csv_buffer = StringIO()
csv_writer = csv.writer(csv_buffer)
for account_name in merged_data:
csv_writer.writerow([account_name])
csv_buffer.seek(0)
s3.upload_fileobj(csv_buffer, s3_bucket, s3_key)
# Write last execution time to a file on S3
s3.put_object(Bucket=s3_bucket, Key='distinct_account_names/last_execution_time', Body=last_execution_time.isoformat())
def get_last_execution_time_from_s3(s3_bucket):
s3 = boto3.client('s3')
try:
obj = s3.get_object(Bucket=s3_bucket, Key='distinct_account_names/last_execution_time')
last_execution_time = obj['Body'].read().decode('utf-8')
return datetime.fromisoformat(last_execution_time)
except s3.exceptions.NoSuchKey:
return None
def get_accounts_starting_with_partial_name(partial_name, s3_bucket):
s3 = boto3.client('s3')
accounts = []
first_letter = partial_name[0].lower()
s3_key = f'distinct_account_names/{first_letter}'
try:
obj = s3.get_object(Bucket=s3_bucket, Key=s3_key)
account_names = csv.reader(obj['Body'].read().decode('utf-8').splitlines())
accounts.extend([name for name in account_names if name.startswith(partial_name)])
except s3.exceptions.NoSuchKey:
pass
return accounts
def main():
dynamodb_table_name = 'your_dynamodb_table_name'
s3_bucket = 'your_s3_bucket_name'
last_execution_time = get_last_execution_time_from_s3(s3_bucket)
new_data = scan_dynamodb_table(dynamodb_table_name, last_execution_time)
merge_and_upload_to_s3(new_data, s3_bucket, datetime.utcnow())
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment