Skip to content

Instantly share code, notes, and snippets.

@ccampo133
Created May 4, 2021 21:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ccampo133/8a8c6d8ba1f63f52df115a47e2f7ddef to your computer and use it in GitHub Desktop.
Save ccampo133/8a8c6d8ba1f63f52df115a47e2f7ddef to your computer and use it in GitHub Desktop.
SQS Exponential Backoff

Usage

I recommend using a virtualenv:

$ python -m venv venv
# ... assuming that 'python' points to a Python 3.7 installation

Then activate it:

$ source venv/bin/activate

Next, install the requirements:

$ pip install -r requirements.txt

Then run localstack:

$ docker run -t -i -m 1g --rm \
    -e "SERVICES=s3,dynamodb,kinesis,sqs,sns" \
    -e "DEBUG=1" \
    -p "4566-4578:4566-4578" \
    --name localstack localstack/localstack:0.11.3

Then just run the script:

$ python sqs_backoff.py
import time
import boto3
from botocore.exceptions import ClientError
sqs = boto3.resource("sqs", endpoint_url="http://localhost:4576")
def create_queue(name, attributes=None):
if not attributes:
attributes = {}
try:
queue = sqs.create_queue(QueueName=name, Attributes=attributes)
print(f"Created queue '{name}' with URL = {queue.url}")
except ClientError as error:
print(f"Couldn't create queue named '{name}'.")
raise error
else:
return queue
def send_message(queue, message_body, message_attributes=None):
if not message_attributes:
message_attributes = {}
try:
response = queue.send_message(MessageBody=message_body, MessageAttributes=message_attributes)
except ClientError as error:
print(f"Send message failed: {message_body}")
raise error
else:
return response
def receive_messages(queue, max_number=10, wait_time=2):
try:
messages = queue.receive_messages(
AttributeNames=["All"],
MessageAttributeNames=["All"],
MaxNumberOfMessages=max_number,
WaitTimeSeconds=wait_time
)
for msg in messages:
print(f"Received message: ID = {msg.message_id}, body = {msg.body}")
except ClientError as error:
print(f"Couldn't receive messages from queue: '{queue}'")
raise error
else:
return messages
# Might want to introduce randomness here, if synchronized concurrent clients is a possibility.
# See: https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter
def get_exponential_backoff_wait_time(n, base_wait_time_sec, max_wait_time_sec):
return int(min((2 ** n) + base_wait_time_sec, max_wait_time_sec))
def main():
max_attempts = 5
visibility_timeout = 5
max_wait_time = 60
# Send a single message to the queue
queue = create_queue("backoff_test_queue", {"VisibilityTimeout": str(visibility_timeout)})
queue.purge()
send_message(queue, "test")
start_time = time.time_ns()
num_attempts = 0
wait_time = 0
total_wait_time = 0
# Listen for messages
while num_attempts < max_attempts:
messages = receive_messages(queue)
# Empty retrieve - do nothing
if len(messages) == 0:
continue
msg = messages[0]
# This is where you would do something with the message. If that 'something' was successful, the message
# would then be deleted from the queue. In this example, we just do nothing to simulate a 'failure', and
# just leave the message on the queue to be reprocessed.
print(f"Attempt: {num_attempts}")
num_attempts = int(msg.attributes["ApproximateReceiveCount"])
wait_time = get_exponential_backoff_wait_time(num_attempts, visibility_timeout, max_wait_time)
total_wait_time += wait_time
print(f"Wait time: {wait_time}")
# Update the message with the new visibility timeout. This is the 'backoff' step
msg.change_visibility(VisibilityTimeout=wait_time)
end_time = time.time_ns()
elapsed_sec = (end_time - start_time) // 1_000_000_000
theoretical_wait_time_sec = total_wait_time - wait_time
print(f"Max retries reached. Waited {elapsed_sec} sec. Expected wait {theoretical_wait_time_sec} sec")
queue.purge()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment