Skip to content

Instantly share code, notes, and snippets.

@yohangdev
Last active July 11, 2021 16:27
Show Gist options
  • Save yohangdev/956d312de32ccbba37cf86d259b6a2d6 to your computer and use it in GitHub Desktop.
Save yohangdev/956d312de32ccbba37cf86d259b6a2d6 to your computer and use it in GitHub Desktop.
AWS Lambda (Python 3.8 runtime) for bulk processing CSV file, processing message template with dynamic variables, then push message to AWS SQS
Hello {param1}
This is your invoice number {param2}
Total cost: {param3}
import os
import logging
import json
import boto3
import re
sqs_endpoint_url = os.environ['SQS_ENDPOINT_URL']
sqs_queue_url = os.environ['SQS_QUEUE_URL']
logger = logging.getLogger()
logger.setLevel(logging.INFO)
s3 = boto3.client('s3')
sqs = boto3.client('sqs', endpoint_url=sqs_endpoint_url)
def build_message(params):
template_file = s3.get_object(Bucket='s3-bucket-name', Key='BlastTemplate.txt')
body = template_file['Body'].read().decode('utf-8')
param_index = 0
for param in params:
body = body.replace('{param'+str(param_index)+'}', params[param_index].strip())
param_index += 1
return body
def send_message(phone_number, params):
message = build_message(params)
# Send message to SQS queue
sqs.send_message(
QueueUrl=sqs_queue_url,
DelaySeconds=10,
MessageAttributes={
'PhoneNumber': {
'DataType': 'String',
'StringValue': phone_number
},
},
MessageBody=(
message
)
)
def lambda_handler(event, context):
count_success = 0
count_failed = 0
for record in event['Records']:
bucket = record['s3']['bucket']['name']
file_key = record['s3']['object']['key']
logger.info(f'IMPORT START Read File: {bucket}/{file_key}')
csv_file = s3.get_object(Bucket=bucket, Key=file_key)
rows = csv_file['Body'].read().decode('utf-8-sig').splitlines(True)
total_rows = len(rows)
logger.info(f'IMPORT OPEN FILE: {total_rows-1} rows')
row_number = 0
for row in rows:
# skip first row
if row_number == 0:
row_number += 1
continue
row = row.strip()
cols = row.split(';')
logger.info(f'IMPORT ROW {row_number}: {cols}')
phone_number = cols[0].strip()
params = cols
send_message(phone_number, params)
row_number += 1
count_success += 1
logger.info(f'IMPORT END: Total: {total_rows-1}; Success: {count_success}; FAILED: {count_failed}')
logger.info(f'IMPORT FINISH: File: {bucket}/{file_key}')
return {
'statusCode': 200,
'body': json.dumps('Success')
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment