Skip to content

Instantly share code, notes, and snippets.

@dexterous
Last active January 13, 2024 09:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dexterous/503f1a77b6e2a39059d9e32a3311e40f to your computer and use it in GitHub Desktop.
Save dexterous/503f1a77b6e2a39059d9e32a3311e40f to your computer and use it in GitHub Desktop.
Some people just don't get how to do concurrency/parallelism right. (sigh!)
import boto3
import sys
import Queue
import threading
work_queue = Queue.Queue()
sqs = boto3.resource('sqs')
from_q_name = sys.argv[1]
to_q_name = sys.argv[2]
print("From: " + from_q_name + " To: " + to_q_name)
from_q = sqs.get_queue_by_name(QueueName=from_q_name)
to_q = sqs.get_queue_by_name(QueueName=to_q_name)
def process_queue():
while True:
messages = work_queue.get()
bodies = list()
for i in range(0, len(messages)):
bodies.append({'Id': str(i+1), 'MessageBody': messages[i].body})
#to_q.send_messages(Entries=bodies)
to_q.send_message(MessageBody=messages[i].body, MessageAttributes={
'contentType': {
'StringValue': 'application/json',
'DataType': 'String'
}
})
for message in messages:
print("Coppied " + str(message.body))
message.delete()
for i in range(10):
t = threading.Thread(target=process_queue)
t.daemon = True
t.start()
while True:
messages = list()
for message in from_q.receive_messages(
MaxNumberOfMessages=1,
VisibilityTimeout=30,
WaitTimeSeconds=20):
messages.append(message)
work_queue.put(messages)
work_queue.join()
@dexterous
Copy link
Author

I guess line 27 is misaligned.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment