Skip to content

Instantly share code, notes, and snippets.

@Gsantomaggio
Last active September 13, 2019 13:11
Show Gist options
  • Save Gsantomaggio/00284821aa913027b3169b89cc776121 to your computer and use it in GitHub Desktop.
Save Gsantomaggio/00284821aa913027b3169b89cc776121 to your computer and use it in GitHub Desktop.
import pika
import time
import threading
import _thread
credentials = pika.PlainCredentials('test', 'test')
connection10 = pika.BlockingConnection(pika.ConnectionParameters(host='20.0.0.10', credentials=credentials))
connection11 = pika.BlockingConnection(pika.ConnectionParameters(host='20.0.0.11', credentials=credentials))
connection12 = pika.BlockingConnection(pika.ConnectionParameters(host='20.0.0.12', credentials=credentials))
queue_name = "a_"
q_numbers = 5
def declare_queues():
channel10 = connection10.channel()
channel11 = connection11.channel()
channel12 = connection12.channel()
for i in range(1, q_numbers):
channel10.queue_declare(queue=queue_name + "_10_" + str(i), arguments={'x-queue-type': 'quorum'}, durable=True)
channel11.queue_declare(queue=queue_name + "_11_" + str(i), arguments={'x-queue-type': 'quorum'}, durable=True)
channel12.queue_declare(queue=queue_name + "_12_" + str(i), arguments={'x-queue-type': 'quorum'}, durable=True)
def callback(ch, method, properties, body):
print(" [x] %r %s" % (body, threading.currentThread().getName()))
ch.basic_ack(delivery_tag=method.delivery_tag)
def start_consumers(connection, index):
channel = connection.channel()
channel.basic_consume(callback,
queue=queue_name + "_10_" + str(index),
no_ack=False)
channel.start_consuming()
### here I'd like to have different threads, one for consumer
### the 'start_consuming' is blocking, so I am looking how to
### use different threads or make it working in somehow.
### The currect code does not work, as you can image, the question is
### is there a way to have different consumers (wiht the same connection)
## that can work in "pseudo/parallel" even sharing the same thread?
### The consumers spend 90% of their time in idle, also a sort of green/thread
### can work.
def consumers():
for i in range(1, q_numbers):
_thread.start_new(start_consumers, (connection10, i,))
_thread.start_new(start_consumers, (connection11, i,))
_thread.start_new(start_consumers, (connection12, i,))
# declare_queues()
consumers()
time.sleep(2)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment