Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save timonwong/238f2379d1acbbd4ef0a to your computer and use it in GitHub Desktop.
Save timonwong/238f2379d1acbbd4ef0a to your computer and use it in GitHub Desktop.
Demonstrate that RabbitMQ consumer will get blocked after all connections exhausted
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os
import logging
import threading
import time
import kombu.common
import kombu.pools
import base
import config
logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO)
LOG = logging.getLogger(os.path.basename(__file__))
def producer_proc(connection):
with kombu.pools.producers[connection].acquire(block=True) as producer:
while True:
# LOG.info("Publishing message...")
producer.publish('ping', routing_key='test.msgs')
def consumer_proc(connection):
def process_msg(body, message):
LOG.info("Received message: %s", body)
message.ack()
with connection.Consumer(queues=config.DEFAULT_QUEUE, callbacks=[process_msg]):
while True:
connection.drain_events()
def main():
producer_conn = config.get_connection()
consumer_conn = config.get_connection()
producer_thread = threading.Thread(target=producer_proc,
args=(producer_conn,))
consumer_thread = threading.Thread(target=consumer_proc,
args=(consumer_conn,))
producer_thread.daemon = True
consumer_thread.daemon = True
producer_thread.start()
consumer_thread.start()
time.sleep(0.5)
connections = []
while True:
LOG.info("Try to establish a new rabbit connection...")
connection = config.get_connection()
connection.connect()
connections.append(connection)
LOG.info("[%d] connections", len(connections))
if __name__ == '__main__':
base.install_cry_handler()
main()
# -*- coding: utf-8 -*-
import signal
import sys
import threading
import traceback
from pprint import pprint
from StringIO import StringIO
def cry():
"""Return stacktrace of all active threads.
From https://gist.github.com/737056
"""
tmap = {}
main_thread = None
# get a map of threads by their ID so we can print their names
# during the traceback dump
for t in threading.enumerate():
if getattr(t, "ident", None):
tmap[t.ident] = t
else:
main_thread = t
out = StringIO()
sep = "=" * 49 + "\n"
for tid, frame in sys._current_frames().iteritems():
thread = tmap.get(tid, main_thread)
out.write("%s\n" % (thread.getName(), ))
out.write(sep)
traceback.print_stack(frame, file=out)
out.write(sep)
out.write("LOCAL VARIABLES\n")
out.write(sep)
pprint(frame.f_locals, stream=out)
out.write("\n\n")
return out.getvalue()
def install_cry_handler():
# Jython/PyPy does not have sys._current_frames
is_jython = sys.platform.startswith("java")
is_pypy = hasattr(sys, "pypy_version_info")
if not (is_jython or is_pypy):
def cry_handler(signum, frame):
"""Signal handler logging the stacktrace of all active threads."""
sys.stderr.write("\n" + cry())
signal.signal(signal.SIGUSR1, cry_handler)
# -*- coding: utf-8 -*-
import kombu
AMQP_URI = 'amqp://guest:guest@10.10.0.70:5672//'
DEFAULT_EXCHANGE = kombu.Exchange('test.msgs', type='topic')
DEFAULT_QUEUE = kombu.Queue('test.msgs', DEFAULT_EXCHANGE, routing_key='test.msgs')
def get_connection():
return kombu.BrokerConnection(AMQP_URI)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment