Last active
October 9, 2018 06:30
-
-
Save doseeing/0d02ae0157b6236b97e400aa270d91c1 to your computer and use it in GitHub Desktop.
aiokafka demo
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from aiokafka import AIOKafkaConsumer | |
import asyncio | |
import tracemalloc | |
tracemalloc.start() | |
kafka_host, kafka_port, kafka_consumer_group = '127.0.0.1', 9092, 'mygroup-id' | |
async def consume(topic, group_id): | |
server_addr = "{}:{}".format(kafka_host, kafka_port) | |
consumer = AIOKafkaConsumer( | |
topic, | |
loop=asyncio.get_event_loop(), bootstrap_servers=server_addr, | |
group_id=group_id) | |
# Get cluster layout and join group `my-group` | |
await consumer.start() | |
try: | |
# Consume messages | |
async for msg in consumer: | |
memory_check() | |
finally: | |
# Will leave consumer group; perform autocommit if enabled. | |
await consumer.stop() | |
def memory_check(): | |
# https://tech.gadventures.com/hunting-for-memory-leaks-in-asyncio-applications-3614182efaf7 | |
if tracemalloc.is_tracing(): | |
view_data = {} | |
snapshot = tracemalloc.take_snapshot() | |
top_stats = snapshot.statistics('lineno') | |
view_data['top_10_trace'] = [str(x) for x in top_stats[:10]] | |
print(view_data) | |
if __name__ == '__main__': | |
topic = 'my_topic' | |
# asyncio.ensure_future(consume(topic, group_id=None)) | |
asyncio.ensure_future(consume(topic, group_id='mygroup-id')) | |
''' | |
output aiokafka v0.4.1 group id None | |
{'top_10_trace': ['/usr/lib/python3.6/tracemalloc.py:65: size=99.8 KiB, count=1420, average=72 B', '/home/ubuntu/py3env/lib/python3.6/site-packages/kafka/protocol/types.py:124: size=7632 B, count=18, average=424 B', '/usr/lib/python3.6/asyncio/events.py:145: size=7104 B, count=21, average=338 B', '/home/ubuntu/py3env/lib/python3.6/site-packages/kafka/protocol/types.py:128: size=5648 B, count=12, average=471 B', '/home/ubuntu/py3env/lib/python3.6/site-packages/kafka/protocol/types.py:169: size=2928 B, count=15, average=195 B', '/home/ubuntu/py3env/lib/python3.6/site-packages/aiokafka/client.py:506: size=2848 B, count=38, average=75 B', '/home/ubuntu/py3env/lib/python3.6/site-packages/kafka/protocol/types.py:162: size=2784 B, count=6, average=464 B', '/usr/lib/python3.6/asyncio/tasks.py:313: size=2336 B, count=13, average=180 B', '/usr/lib/python3.6/asyncio/base_events.py:276: size=1776 B, count=17, average=104 B', '/home/ubuntu/py3env/lib/python3.6/site-packages/aiokafka/conn.py:138: size=1664 B, count=3, average=555 B']} | |
output aiokafka v0.4.1 group id mygroup-id | |
{'top_10_trace': ['/usr/lib/python3.6/tracemalloc.py:65: size=78.0 KiB, count=1109, average=72 B', '/usr/lib/python3.6/asyncio/base_events.py:558: size=10008 B, count=86, average=116 B', '/usr/lib/python3.6/asyncio/events.py:145: size=7952 B, count=32, average=248 B', '/home/ubuntu/py3env/lib/python3.6/site-packages/kafka/protocol/types.py:124: size=7552 B, count=17, average=444 B', '/home/ubuntu/py3env/lib/python3.6/site-packages/aiokafka/conn.py:219: size=4736 B, count=8, average=592 B', '/home/ubuntu/py3env/lib/python3.6/site-packages/kafka/protocol/types.py:169: size=4624 B, count=29, average=159 B', '/home/ubuntu/py3env/lib/python3.6/site-packages/kafka/protocol/types.py:128: size=4256 B, count=10, average=426 B', '/usr/lib/python3.6/asyncio/tasks.py:313: size=3488 B, count=24, average=145 B', '/home/ubuntu/py3env/lib/python3.6/site-packages/aiokafka/client.py:506: size=2920 B, count=39, average=75 B', '/usr/lib/python3.6/asyncio/base_events.py:276: size=2720 B, count=25, average=109 B']} | |
output aiokafka v0.4.2 group id None | |
{'top_10_trace': ['/usr/lib/python3.6/tracemalloc.py:65: size=90.0 KiB, count=1280, average=72 B', '/home/ubuntu/py3env/lib/python3.6/site-packages/kafka/protocol/types.py:136: size=15.6 KiB, count=126, average=127 B', '/home/ubuntu/py3env/lib/python3.6/site-packages/kafka/protocol/types.py:132: size=8112 B, count=19, average=427 B', '/home/ubuntu/py3env/lib/python3.6/site-packages/kafka/protocol/types.py:177: size=6960 B, count=78, average=89 B', '/usr/lib/python3.6/asyncio/events.py:145: size=6664 B, count=20, average=333 B', '/home/ubuntu/py3env/lib/python3.6/site-packages/kafka/protocol/types.py:170: size=3248 B, count=7, average=464 B', '/home/ubuntu/py3env/lib/python3.6/site-packages/aiokafka/client.py:509: size=2920 B, count=39, average=75 B', '/usr/lib/python3.6/asyncio/tasks.py:313: size=2336 B, count=13, average=180 B', '/usr/lib/python3.6/asyncio/base_events.py:276: size=1904 B, count=19, average=100 B', '/home/ubuntu/py3env/lib/python3.6/site-packages/aiokafka/conn.py:138: size=1664 B, count=3, average=555 B']} | |
output aiokafka v0.4.2 group id mygroup-id, notice tasks arise | |
{'top_10_trace': ['/usr/lib/python3.6/asyncio/tasks.py:688: size=123 KiB, count=927, average=136 B', '/usr/lib/python3.6/asyncio/base_events.py:276: size=104 KiB, count=953, average=112 B', '/usr/lib/python3.6/tracemalloc.py:498: size=55.2 KiB, count=883, average=64 B', '/home/ubuntu/py3env/lib/python3.6/site-packages/aiokafka/consumer/group_coordinator.py:277: size=43.5 KiB, count=927, average=48 B', '/usr/lib/python3.6/tracemalloc.py:65: size=34.8 KiB, count=495, average=72 B', '/usr/lib/python3.6/tracemalloc.py:180: size=23.6 KiB, count=432, average=56 B', '/usr/lib/python3.6/asyncio/events.py:145: size=7952 B, count=32, average=248 B', '/usr/lib/python3.6/asyncio/tasks.py:704: size=7920 B, count=1, average=7920 B', '/home/ubuntu/py3env/lib/python3.6/site-packages/kafka/protocol/types.py:132: size=7504 B, count=16, average=469 B', '/home/ubuntu/py3env/lib/python3.6/site-packages/kafka/protocol/types.py:177: size=4752 B, count=31, average=153 B']} | |
''' | |
asyncio.get_event_loop().run_forever() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment