Skip to content

Instantly share code, notes, and snippets.

@sweemeng
Last active August 19, 2023 05:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sweemeng/d666810c56bdad4ed52cf4602d9178ce to your computer and use it in GitHub Desktop.
Save sweemeng/d666810c56bdad4ed52cf4602d9178ce to your computer and use it in GitHub Desktop.
Example code for SQS worker
import boto3
class ApplicationSpecificError(Exception):
pass
# You probably want to wrap this in a function, or put in a main() function
sqs = boto3.client('sqs')
while True:
response = sqs.receive_message(
QueueUrl=queue_url_,
MaxNumberOfMessages=1,
VisibilityTimeout=30,
WaitTimeSeconds=0
)
if 'Messages' in response:
message = response['Messages'][0]
logger.debug(message)
receipt_handle = message['ReceiptHandle']
# create keep_alive function to update visibiltiy timeout
# keep_alive(response['Messages'][0]['ReceiptHandle'], queue_url)
# put in a thread
def keep_alive(receipt_handle, queue_url):
while True:
try:
sqs.change_message_visibility(
QueueUrl=queue_url,
ReceiptHandle=receipt_handle,
VisibilityTimeout=20
)
time.sleep(10)
except ClientError as e:
logger.debug("killed")
break
# create thread to use keep_alive function
thread = threading.Thread(target=keep_alive, args=(receipt_handle, queue_url_))
thread.start()
# get message from queue
body = message['Body']
logger.debug(body)
data = json.loads(body)
logger.debug(f"working")
try:
# Process message here, for example process(data)
# callback may also be called here, that depends on your application
logger.debug(f"finished")
# delete message from queue. do not delete if fail, allow for recovery
sqs.delete_message(
QueueUrl=queue_url_,
ReceiptHandle=receipt_handle
)
except ApplicationSpecificError as e:
logger.error(e)
# Delete queue if you think that this error should not be recovered. i.e Other worker should not try to process this message
sqs.delete_message(
QueueUrl=queue_url_,
ReceiptHandle=receipt_handle
)
continue
except Exception as e:
# Stop worker if this is a code error.
logger.error(e)
break # stop the worker, so that failure does not overwhelm the queue
# Be nice to all service
time.sleep(1)
else:
# logger.info("No messages in queue")
time. Sleep(5)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment