Skip to content

Instantly share code, notes, and snippets.

@mattbennett
Last active July 7, 2022 19:38
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save mattbennett/30cc95d42346df62a60e to your computer and use it in GitHub Desktop.
Save mattbennett/30cc95d42346df62a60e to your computer and use it in GitHub Desktop.
Greenthread worker
import eventlet
eventlet.monkey_patch()
from eventlet.greenpool import GreenPile
from kombu.pools import producers
from kombu import Exchange, Queue
exchange = Exchange('exchange', type='direct')
queue = Queue('queue', exchange, routing_key='queue')
if __name__ == '__main__':
from kombu import Connection
connection = Connection('amqp://guest:guest@localhost:5672//')
def publish(index):
with producers[connection].acquire(block=True) as producer:
producer.publish(index, routing_key='queue', serializer='json')
pile = GreenPile()
for index in xrange(100):
pile.spawn(publish, index)
list(pile)
import eventlet
eventlet.monkey_patch()
import random
import time
import threading
from kombu.mixins import ConsumerMixin
from kombu.log import get_logger
from kombu import Exchange, Queue
exchange = Exchange('exchange', type='direct')
queue = Queue('queue', exchange, routing_key='queue')
logger = get_logger(__name__)
class Worker(ConsumerMixin):
def __init__(self, connection):
self.connection = connection
def handle_message(self, body, message):
print "CONSUME", body, "in", threading.current_thread().get_name()
eventlet.spawn(self.process_task, body, message)
def get_consumers(self, Consumer, channel):
consumer = Consumer(queues=[queue],
accept=['json'],
callbacks=[self.handle_message])
consumer.qos(prefetch_count=10)
return [consumer]
def process_task(self, body, message):
time.sleep(random.random())
print "ACK", body, "in", threading.current_thread().get_name()
message.ack()
if __name__ == '__main__':
from kombu import Connection
from kombu.utils.debug import setup_logging
# setup root logger
setup_logging(loglevel='INFO', loggers=[''])
with Connection('amqp://guest:guest@localhost:5672//') as conn:
try:
worker = Worker(conn)
eventlet.spawn(worker.run).wait()
except KeyboardInterrupt:
print('bye bye')
@tiffanylam2010
Copy link

RabbitMQ recommends use ONE Connection with multiple Channels.
Why not use ChannelPool instead of ConnectionPool in producers ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment