Created
May 9, 2019 11:36
-
-
Save tilsche/1c0e0141eed018cc5e2df4f75c8617d5 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env bash | |
set -e | |
PYTHON37= | |
PYTHON36= | |
AIORMQ= | |
AIO_PIKA= | |
MSG=100000 | |
URL=amqp://localhost | |
OUT=test.csv | |
AIORMQ_VERSIONS="d37c9c7 b7b17bf 5ebdf72" | |
echo "library,version,messages,time,rate" > "$OUT" | |
function run_test { | |
echo "### Running $LIB $VERSION" | |
$PYTHON37 ./test-pika-publish.py $URL $MSG | |
RESULT=$($PYTHON ./test-$LIB-consume.py $URL $MSG) | |
echo "### $RESULT" | |
echo "$LIB,$LIB-$VERSION,$RESULT" >> "$OUT" | |
} | |
for I in {1..6}; do | |
echo "Iteration $I" | |
LIB="pika" | |
PYTHON="$PYTHON37" | |
VERSION="1.0.1" | |
run_test | |
LIB="aio-pika" | |
PYTHON="$PYTHON36" | |
VERSION="2.8.3" | |
run_test | |
LIB="aio-pika" | |
PYTHON="$PYTHON37" | |
AIO_PIKA_VERSION="3e2f02b" | |
for AIORMQ_VERSION in $AIORMQ_VERSIONS; do | |
(cd $AIORMQ; git checkout $AIORMQ_VERSION) | |
VERSION="$AIO_PIKA_VERSION-aiormq-$AIORMQ_VERSION" | |
run_test | |
done | |
LIB="aiormq" | |
PYTHON="$PYTHON37" | |
for VERSION in $AIORMQ_VERSIONS; do | |
(cd $AIORMQ; git checkout $VERSION) | |
run_test | |
done | |
done |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import asyncio | |
import time | |
import aio_pika | |
import click | |
async def amain(url, message_count): | |
body = b'Hi' | |
connection = await aio_pika.connect_robust(url) | |
channel = await connection.channel() | |
await channel.set_qos(prefetch_count=400) | |
queue = await channel.declare_queue('test-queue') | |
exchange = await channel.declare_exchange('test.exchange', type=aio_pika.ExchangeType.TOPIC) | |
recieved_message_count = 0 | |
all_done = asyncio.Event() | |
async def on_message(message: aio_pika.Message): | |
nonlocal recieved_message_count | |
recieved_message_count += 1 | |
with message.process(requeue=True): | |
assert message.body == body | |
if recieved_message_count == message_count: | |
all_done.set() | |
consume_begin = time.monotonic() | |
tag = await queue.consume(on_message) | |
await all_done.wait() | |
consume_end = time.monotonic() | |
consume_duration = consume_end - consume_begin | |
print(f'{recieved_message_count},{consume_duration},{recieved_message_count / consume_duration}') | |
await queue.cancel(tag) | |
await queue.delete() | |
await exchange.delete() | |
await channel.close() | |
await connection.close() | |
@click.command() | |
@click.argument('url', default='amqp://localhost/') | |
@click.argument('message_count', default=10000) | |
def main(url, message_count): | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(amain(url, message_count)) | |
loop.close() | |
if __name__ == '__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import asyncio | |
import time | |
import aio_pika | |
import click | |
async def amain(url, message_count): | |
body = b'Hi' | |
connection = await aio_pika.connect_robust(url) | |
channel = await connection.channel() | |
await channel.set_qos(prefetch_count=400) | |
queue = await channel.declare_queue('test-queue') | |
exchange = await channel.declare_exchange('test.exchange', type=aio_pika.ExchangeType.TOPIC) | |
await queue.bind(exchange=exchange, routing_key='test') | |
publish_begin = time.monotonic() | |
for i in range(message_count): | |
await exchange.publish(message=aio_pika.Message(body=body), routing_key='test') | |
publish_end = time.monotonic() | |
publish_duration = publish_end - publish_begin | |
print(f'{message_count},{publish_duration},{message_count/publish_duration}') | |
@click.command() | |
@click.argument('url', default='amqp://localhost/') | |
@click.argument('message_count', default=10000) | |
def main(url, message_count): | |
asyncio.run(amain(url, message_count)) | |
if __name__ == '__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import asyncio | |
import time | |
import aiormq | |
import click | |
async def amain(url, message_count): | |
body = b'Hi' | |
exchange = 'test.exchange' | |
queue = 'test-queue' | |
connection = aiormq.Connection(url) | |
await connection.connect() | |
channel = await connection.channel() | |
await channel.basic_qos(prefetch_count=400) | |
deaclare_ok = await channel.queue_declare(queue) | |
all_done = asyncio.Event() | |
async def on_message(message): | |
nonlocal recieved_message_count | |
recieved_message_count += 1 | |
assert body == body | |
await message.channel.basic_ack(message.delivery.delivery_tag) | |
if message_count == recieved_message_count: | |
all_done.set() | |
recieved_message_count = 0 | |
consume_begin = time.monotonic() | |
await channel.basic_consume(deaclare_ok.queue, on_message) | |
await all_done.wait() | |
consume_end = time.monotonic() | |
consume_duration = consume_end - consume_begin | |
print(f'{recieved_message_count},{consume_duration},{recieved_message_count / consume_duration}') | |
await channel.queue_delete(queue) | |
await channel.exchange_delete(exchange) | |
await connection.close() | |
@click.command() | |
@click.argument('url', default='amqp://localhost/') | |
@click.argument('message_count', default=10000) | |
def main(url, message_count): | |
asyncio.run(amain(url, message_count)) | |
if __name__ == '__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import time | |
import pika | |
import click | |
@click.command() | |
@click.argument('url', default='amqp://localhost/') | |
@click.argument('message_count', default=10000) | |
def main(url, message_count): | |
body = b'Hi' | |
exchange = 'test.exchange' | |
queue = 'test-queue' | |
routing_key = 'test' | |
parameters = pika.URLParameters(url) | |
connection = pika.BlockingConnection(parameters) | |
channel = connection.channel() | |
channel.confirm_delivery() | |
channel.basic_qos(prefetch_count=400) | |
def on_message(channel, method_frame, header_frame, message_body): | |
nonlocal recieved_message_count | |
recieved_message_count += 1 | |
assert body == message_body | |
channel.basic_ack(delivery_tag=method_frame.delivery_tag) | |
if message_count == recieved_message_count: | |
raise StopIteration() | |
recieved_message_count = 0 | |
consume_begin = time.monotonic() | |
channel.basic_consume(queue, on_message) | |
try: | |
channel.start_consuming() | |
except StopIteration: | |
channel.stop_consuming() | |
consume_end = time.monotonic() | |
consume_duration = consume_end - consume_begin | |
print(f'{recieved_message_count},{consume_duration},{recieved_message_count / consume_duration}') | |
channel.queue_delete(queue) | |
channel.exchange_delete(exchange) | |
connection.close() | |
if __name__ == '__main__': | |
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import time | |
import pika | |
import click | |
@click.command() | |
@click.argument('url', default='amqp://localhost/') | |
@click.argument('message_count', default=10000) | |
def main(url, message_count): | |
body = b'Hi' | |
exchange = 'test.exchange' | |
queue = 'test-queue' | |
routing_key = 'test' | |
parameters = pika.URLParameters(url) | |
connection = pika.BlockingConnection(parameters) | |
channel = connection.channel() | |
#channel.confirm_delivery() | |
channel.basic_qos(prefetch_count=400) | |
channel.exchange_declare(exchange, 'topic') | |
channel.queue_declare(queue) | |
channel.queue_bind(queue, exchange, routing_key) | |
publish_begin = time.monotonic() | |
for i in range(message_count): | |
channel.basic_publish(exchange, routing_key, body) | |
publish_end = time.monotonic() | |
publish_duration = publish_end - publish_begin | |
print(f'{message_count},{publish_duration},{message_count/publish_duration}') | |
connection.close() | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment