Skip to content

Instantly share code, notes, and snippets.

@mauriciosl
Last active October 10, 2022 19:54
Show Gist options
  • Save mauriciosl/42ceaf1a6ab0d66b88616900b825cce4 to your computer and use it in GitHub Desktop.
Save mauriciosl/42ceaf1a6ab0d66b88616900b825cce4 to your computer and use it in GitHub Desktop.
# This script will listen to a given topic at the localstack and print the messages in 20 seconds
import sys
import boto3
region = 'us-west-2'
queue_name = 'listening-queue'
queue_arn = f'arn:aws:sqs:{region}:000000000000:{queue_name}'
queue_url = f'http://localhost:4566/000000000000/{queue_name}'
sns = boto3.client('sns', region_name=region, endpoint_url='http://localhost:4566')
sqs = boto3.client('sqs', region_name=region, endpoint_url='http://localhost:4566')
def assure_queue_exists(name):
sqs.create_queue(QueueName=name)
def assure_subscription(topic_arn):
sns.subscribe(TopicArn=topic_arn, Protocol='sqs',
Endpoint=queue_arn)
def assure_topic(name):
sns.create_topic(Name=name)
def get_messages():
result = sqs.receive_message(QueueUrl=queue_url, WaitTimeSeconds=20)
for message in result.get('Messages', []):
sqs.delete_message(QueueUrl=queue_url, ReceiptHandle=message['ReceiptHandle'])
yield message['Body']
def setup(topic_name):
assure_topic(topic_name)
assure_queue_exists(queue_name)
topic_arn = f'arn:aws:sns:{region}:000000000000:{topic_name}'
assure_subscription(topic_arn)
def main(topic_name):
setup(topic_name)
for message in get_messages():
print(message)
if __name__ == "__main__":
if len(sys.argv) < 2:
topic_to_listen = input("Topic Name:")
else:
topic_to_listen = sys.argv[1]
main(topic_to_listen)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment