Skip to content

Instantly share code, notes, and snippets.

@get-data-
Created December 6, 2019 16:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save get-data-/87a66ef48063b7b9ebdfa0ef339ef080 to your computer and use it in GitHub Desktop.
Save get-data-/87a66ef48063b7b9ebdfa0ef339ef080 to your computer and use it in GitHub Desktop.
lambda handler for sqs msg handling
# handler.py
"""AWS Lambda function for transferring files from an S3 bucket to another
S3 bucket on a CreateObject event.
Required env vars:
:param SQS_QUEUE_URL: URL of SQS queue
Optional env vars:
:param LOGGING_LEVEL: string log level (default ERROR)
:param NUM_MESSAGES: int # of msg to retrieve per execution (default 1)
:validation NUM_MESSAGES: accepts int 1 - 10
"""
import os
import boto3
import logging
from botocore.exceptions import ClientError
logger = logging.getLogger()
logger.basicConfig(
level=logging.ERROR,
format='%(levelname)s: %(asctime)s: %(message)s')
logger.setLevel(os.getenv('LOGGING_LEVEL', 'ERROR'))
def main(event, context):
"""Retrieve messages from a dead letter queue to retry.
This is the Lambda entry point. It queries an sqs queue and retries failed
S3 PUT events caused by a delay in write consistency. (if successful), it
deletes messages from the queue after invoking the inbound-sync.
:param event: dict, the event payload delivered by Lambda.
:param context: a LambdaContext object - unused.
"""
# Assign this value before running the program
sqs_queue_url = os.getenv('SQS_QUEUE_URL', None)
if not sqs_queue_url:
raise ValueError('SQS_QUEUE_URL Environment variable missing')
num_messages = os.getenv('NUM_MESSAGES', 1)
logger.info(f"Inbound Sync DLQ Retry: Looking into queue")
# Retrieve SQS messages
msgs = retrieve_sqs_messages(sqs_queue_url, num_messages)
if msgs is not None:
for msg in msgs:
logging.info(
f'SQS: Message ID: {msg["MessageId"]}, '
f'Contents: {msg["Body"]}')
# Remove the message from the queue
delete_sqs_message(sqs_queue_url, msg['ReceiptHandle'])
def retrieve_sqs_messages(
sqs_queue_url, num_msgs=1, wait_time=0, visibility_time=5):
"""Retrieve messages from an SQS queue
The retrieved messages are not deleted from the queue.
:param sqs_queue_url: String URL of existing SQS queue
:param num_msgs: Number of messages to retrieve (1-10)
:param wait_time: Number of seconds to wait if no messages in queue
:param visibility_time: Number of seconds to make retrieved messages
hidden from subsequent retrieval requests
:return: List of retrieved messages. If no messages are available,
returned list is empty. If error, returns None.
"""
# Validate number of messages to retrieve
if num_msgs < 1:
num_msgs = 1
elif num_msgs > 10:
num_msgs = 10
# Retrieve messages from an SQS queue
sqs_client = boto3.client('sqs')
try:
msgs = sqs_client.receive_message(
QueueUrl=sqs_queue_url,
MaxNumberOfMessages=num_msgs,
WaitTimeSeconds=wait_time,
VisibilityTimeout=visibility_time)
except ClientError as e:
logging.error(e)
# Return the list of retrieved messages
return msgs['Messages']
def delete_sqs_message(sqs_queue_url, msg_receipt_handle):
"""Delete a message from an SQS queue
:param sqs_queue_url: String URL of existing SQS queue
:param msg_receipt_handle: Receipt handle value of retrieved message
"""
# Delete the message from the SQS queue
sqs_client = boto3.client('sqs')
sqs_client.delete_message(
QueueUrl=sqs_queue_url,
ReceiptHandle=msg_receipt_handle)
if __name__ == "__main__":
import sys
import json
try:
with open(sys.argv[1]) as f:
event = json.loads(f.read())
handler = logging.StreamHandler(sys.stdout)
logger.addHandler(handler)
main(event, '')
except Exception as ex:
logger.exception(ex)
raise SystemExit()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment