Skip to content

Instantly share code, notes, and snippets.

@rlcarrca
Created April 16, 2016 20:29
Show Gist options
  • Save rlcarrca/f0a8eaa52660725a2a964f2fda5b2d3a to your computer and use it in GitHub Desktop.
Save rlcarrca/f0a8eaa52660725a2a964f2fda5b2d3a to your computer and use it in GitHub Desktop.
Multiprocessing sample with RabbitMQ
#!/usr/bin/env python
import sys
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello', durable=True)
channel.basic_qos(prefetch_count=1)
message = ' '.join(sys.argv[1:]) or "Hello World!"
channel.basic_publish(exchange='',
routing_key='hello',
body=message,
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
))
print " [x] Sent '%r'!" % message
connection.close()
Raw
#!/usr/bin/env python
"""
Create multiple RabbitMQ connections from a single thread, using Pika and multiprocessing.Pool.
Based on tutorial 2 (http://www.rabbitmq.com/tutorials/tutorial-two-python.html).
"""
import multiprocessing
import time
import pika
def callback(ch, method, properties, body):
print " [x] %r received %r" % (multiprocessing.current_process(), body,)
time.sleep(body.count('.'))
# print " [x] Done"
ch.basic_ack(delivery_tag=method.delivery_tag)
def consume():
connection = pika.BlockingConnection(pika.ConnectionParameters(
'localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
channel.basic_consume(callback,
queue='hello')
print ' [*] Waiting for messages. To exit press CTRL+C'
try:
channel.start_consuming()
except KeyboardInterrupt:
pass
workers = 5
pool = multiprocessing.Pool(processes=workers)
for i in xrange(0, workers):
pool.apply_async(consume)
# Stay alive
try:
while True:
continue
except KeyboardInterrupt:
print ' [*] Exiting...'
pool.terminate()
pool.join()
@leloss
Copy link

leloss commented Aug 15, 2020

I'm happy to see an example of rabbitmq with multiprocessing.
Have you tried to implement this approach but with a shared connection?

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
...
for i in xrange(0, workers):
pool.apply_async(consume, args=(connection, ))
...
def consume(connection):
channel = connection.channel()
...

I can't get it to work. Would appreciate some insight! Thanks!

@ionutbaltariu
Copy link

For other people that might want to try to share a connection between processes or threads: That is neither recommended nor supported by pika.
source

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment