Skip to content

Instantly share code, notes, and snippets.

@tilsche
Created May 9, 2019 11:36
Show Gist options
  • Save tilsche/1c0e0141eed018cc5e2df4f75c8617d5 to your computer and use it in GitHub Desktop.
Save tilsche/1c0e0141eed018cc5e2df4f75c8617d5 to your computer and use it in GitHub Desktop.
#!/usr/bin/env bash
set -e
PYTHON37=
PYTHON36=
AIORMQ=
AIO_PIKA=
MSG=100000
URL=amqp://localhost
OUT=test.csv
AIORMQ_VERSIONS="d37c9c7 b7b17bf 5ebdf72"
echo "library,version,messages,time,rate" > "$OUT"
function run_test {
echo "### Running $LIB $VERSION"
$PYTHON37 ./test-pika-publish.py $URL $MSG
RESULT=$($PYTHON ./test-$LIB-consume.py $URL $MSG)
echo "### $RESULT"
echo "$LIB,$LIB-$VERSION,$RESULT" >> "$OUT"
}
for I in {1..6}; do
echo "Iteration $I"
LIB="pika"
PYTHON="$PYTHON37"
VERSION="1.0.1"
run_test
LIB="aio-pika"
PYTHON="$PYTHON36"
VERSION="2.8.3"
run_test
LIB="aio-pika"
PYTHON="$PYTHON37"
AIO_PIKA_VERSION="3e2f02b"
for AIORMQ_VERSION in $AIORMQ_VERSIONS; do
(cd $AIORMQ; git checkout $AIORMQ_VERSION)
VERSION="$AIO_PIKA_VERSION-aiormq-$AIORMQ_VERSION"
run_test
done
LIB="aiormq"
PYTHON="$PYTHON37"
for VERSION in $AIORMQ_VERSIONS; do
(cd $AIORMQ; git checkout $VERSION)
run_test
done
done
#!/usr/bin/env python3
import asyncio
import time
import aio_pika
import click
async def amain(url, message_count):
body = b'Hi'
connection = await aio_pika.connect_robust(url)
channel = await connection.channel()
await channel.set_qos(prefetch_count=400)
queue = await channel.declare_queue('test-queue')
exchange = await channel.declare_exchange('test.exchange', type=aio_pika.ExchangeType.TOPIC)
recieved_message_count = 0
all_done = asyncio.Event()
async def on_message(message: aio_pika.Message):
nonlocal recieved_message_count
recieved_message_count += 1
with message.process(requeue=True):
assert message.body == body
if recieved_message_count == message_count:
all_done.set()
consume_begin = time.monotonic()
tag = await queue.consume(on_message)
await all_done.wait()
consume_end = time.monotonic()
consume_duration = consume_end - consume_begin
print(f'{recieved_message_count},{consume_duration},{recieved_message_count / consume_duration}')
await queue.cancel(tag)
await queue.delete()
await exchange.delete()
await channel.close()
await connection.close()
@click.command()
@click.argument('url', default='amqp://localhost/')
@click.argument('message_count', default=10000)
def main(url, message_count):
loop = asyncio.get_event_loop()
loop.run_until_complete(amain(url, message_count))
loop.close()
if __name__ == '__main__':
main()
#!/usr/bin/env python3
import asyncio
import time
import aio_pika
import click
async def amain(url, message_count):
body = b'Hi'
connection = await aio_pika.connect_robust(url)
channel = await connection.channel()
await channel.set_qos(prefetch_count=400)
queue = await channel.declare_queue('test-queue')
exchange = await channel.declare_exchange('test.exchange', type=aio_pika.ExchangeType.TOPIC)
await queue.bind(exchange=exchange, routing_key='test')
publish_begin = time.monotonic()
for i in range(message_count):
await exchange.publish(message=aio_pika.Message(body=body), routing_key='test')
publish_end = time.monotonic()
publish_duration = publish_end - publish_begin
print(f'{message_count},{publish_duration},{message_count/publish_duration}')
@click.command()
@click.argument('url', default='amqp://localhost/')
@click.argument('message_count', default=10000)
def main(url, message_count):
asyncio.run(amain(url, message_count))
if __name__ == '__main__':
main()
#!/usr/bin/env python3
import asyncio
import time
import aiormq
import click
async def amain(url, message_count):
body = b'Hi'
exchange = 'test.exchange'
queue = 'test-queue'
connection = aiormq.Connection(url)
await connection.connect()
channel = await connection.channel()
await channel.basic_qos(prefetch_count=400)
deaclare_ok = await channel.queue_declare(queue)
all_done = asyncio.Event()
async def on_message(message):
nonlocal recieved_message_count
recieved_message_count += 1
assert body == body
await message.channel.basic_ack(message.delivery.delivery_tag)
if message_count == recieved_message_count:
all_done.set()
recieved_message_count = 0
consume_begin = time.monotonic()
await channel.basic_consume(deaclare_ok.queue, on_message)
await all_done.wait()
consume_end = time.monotonic()
consume_duration = consume_end - consume_begin
print(f'{recieved_message_count},{consume_duration},{recieved_message_count / consume_duration}')
await channel.queue_delete(queue)
await channel.exchange_delete(exchange)
await connection.close()
@click.command()
@click.argument('url', default='amqp://localhost/')
@click.argument('message_count', default=10000)
def main(url, message_count):
asyncio.run(amain(url, message_count))
if __name__ == '__main__':
main()
#!/usr/bin/env python3
import time
import pika
import click
@click.command()
@click.argument('url', default='amqp://localhost/')
@click.argument('message_count', default=10000)
def main(url, message_count):
body = b'Hi'
exchange = 'test.exchange'
queue = 'test-queue'
routing_key = 'test'
parameters = pika.URLParameters(url)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.confirm_delivery()
channel.basic_qos(prefetch_count=400)
def on_message(channel, method_frame, header_frame, message_body):
nonlocal recieved_message_count
recieved_message_count += 1
assert body == message_body
channel.basic_ack(delivery_tag=method_frame.delivery_tag)
if message_count == recieved_message_count:
raise StopIteration()
recieved_message_count = 0
consume_begin = time.monotonic()
channel.basic_consume(queue, on_message)
try:
channel.start_consuming()
except StopIteration:
channel.stop_consuming()
consume_end = time.monotonic()
consume_duration = consume_end - consume_begin
print(f'{recieved_message_count},{consume_duration},{recieved_message_count / consume_duration}')
channel.queue_delete(queue)
channel.exchange_delete(exchange)
connection.close()
if __name__ == '__main__':
main()
#!/usr/bin/env python3
import time
import pika
import click
@click.command()
@click.argument('url', default='amqp://localhost/')
@click.argument('message_count', default=10000)
def main(url, message_count):
body = b'Hi'
exchange = 'test.exchange'
queue = 'test-queue'
routing_key = 'test'
parameters = pika.URLParameters(url)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
#channel.confirm_delivery()
channel.basic_qos(prefetch_count=400)
channel.exchange_declare(exchange, 'topic')
channel.queue_declare(queue)
channel.queue_bind(queue, exchange, routing_key)
publish_begin = time.monotonic()
for i in range(message_count):
channel.basic_publish(exchange, routing_key, body)
publish_end = time.monotonic()
publish_duration = publish_end - publish_begin
print(f'{message_count},{publish_duration},{message_count/publish_duration}')
connection.close()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment