Skip to content

Instantly share code, notes, and snippets.

@comzyh
Last active May 7, 2016 19:21
Show Gist options
  • Save comzyh/0262f159f764a748a163f9f13b26578b to your computer and use it in GitHub Desktop.
Save comzyh/0262f159f764a748a163f9f13b26578b to your computer and use it in GitHub Desktop.
aioamqp 0.7 heartbeat bug reproduce
# -*- coding: utf-8 -*-
# @Author: Comzyh
# @Date: 2016-05-07 22:46:41
# @Last Modified by: Comzyh
# @Last Modified time: 2016-05-08 03:12:27
import aioamqp
import asyncio
import logging
import sys
import datetime
@asyncio.coroutine
def on_error(exception):
raise exception
@asyncio.coroutine
def connect():
transport, protocol = yield from aioamqp.connect(heartbeat=10, on_error=on_error)
channel = yield from protocol.channel()
yield from channel.queue_declare(queue_name='heartbeat_test', durable=False)
yield from channel.queue_bind(
queue_name='heartbeat_test', exchange_name='amq.direct', routing_key='heartbeat_test')
return channel
@asyncio.coroutine
def sender():
channel = yield from connect()
while True:
yield from channel.basic_publish(
payload='test_payload: {0}'.format(datetime.datetime.now()),
exchange_name='amq.direct',
routing_key='heartbeat_test')
logging.info('payload sent.')
yield from asyncio.sleep(1)
@asyncio.coroutine
def receiver():
@asyncio.coroutine
def on_receive(channel, body, envelope, properties):
logging.info(body)
channel = yield from connect()
yield from channel.basic_consume(queue_name='heartbeat_test', callback=on_receive, no_ack=True)
def pika_receiver():
import pika
def _consume(ch, method, properties, body):
logging.info(body)
connection = pika.BlockingConnection(pika.ConnectionParameters())
channel = connection.channel()
channel.basic_consume(_consume, queue='heartbeat_test')
channel.start_consuming()
def main():
logging.basicConfig(stream=sys.stdout, level=logging.DEBUG,
format='[%(asctime)s] %(name)s:%(levelname)s: %(message)s')
if len(sys.argv) > 1:
loop = asyncio.get_event_loop()
if sys.argv[1] == 'sender':
loop.run_until_complete(sender())
elif sys.argv[1] == 'receiver':
loop.run_until_complete(receiver())
loop.run_forever()
elif sys.argv[1] == 'pika_receiver':
pika_receiver()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment