Skip to content

Instantly share code, notes, and snippets.

@joshghent
Created January 14, 2020 10:29
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save joshghent/ca4a1272031e2a52af57d5e8ec5d53c5 to your computer and use it in GitHub Desktop.
Save joshghent/ca4a1272031e2a52af57d5e8ec5d53c5 to your computer and use it in GitHub Desktop.
A small script to push messages from SQS to SNS
# Usage
# $ python sqs_to_sns.py my-queue-name
import boto3
import sys
import queue
import threading
from datetime import datetime
import json
from uuid import uuid4
work_queue = queue.Queue()
sqs = boto3.resource('sqs')
sns = boto3.client('sns')
sqs_client = boto3.client('sqs')
from_q_name = sys.argv[1]
print(("From: " + from_q_name + " To: SNS"))
from_q = sqs.get_queue_by_name(QueueName=from_q_name)
to_q = sqs.get_queue_by_name(QueueName='backup-queue')
skipped = 0
processed = 0
total = 0
def process_queue():
while True:
messages = work_queue.get()
global total
total = len(messages)
for message in messages:
message_content = json.loads(message.body)
message.delete()
print("Backing up Message to dead letter queue - just in case. Id: " +
message_content['MessageId'])
bodies = list()
bodies.append({'Id': str(uuid4()), 'MessageBody': message.body})
to_q.send_messages(Entries=bodies)
response = sns.publish(
TopicArn=message_content['TopicArn'], Message=message_content['Message'])
print(("Published Message to Topic " + str(message_content['MessageId']) +
". To TopicArn: " + message_content['TopicArn'] + ". Received response " + json.dumps(response)))
global processed
processed = processed + 1
for i in range(10):
t = threading.Thread(target=process_queue)
t.daemon = True
t.start()
while True:
messages = list()
for message in from_q.receive_messages(
MaxNumberOfMessages=10,
VisibilityTimeout=123,
WaitTimeSeconds=20):
messages.append(message)
work_queue.put(messages)
work_queue.join()
print("Processed " + str(processed) + " of " + str(total) +
". Skipped " + str(skipped) + " messages. Exiting")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment