Skip to content

Instantly share code, notes, and snippets.

@doseeing
Last active October 9, 2018 06:30
Show Gist options
  • Save doseeing/0d02ae0157b6236b97e400aa270d91c1 to your computer and use it in GitHub Desktop.
Save doseeing/0d02ae0157b6236b97e400aa270d91c1 to your computer and use it in GitHub Desktop.
aiokafka demo
from aiokafka import AIOKafkaConsumer
import asyncio
import tracemalloc
tracemalloc.start()
kafka_host, kafka_port, kafka_consumer_group = '127.0.0.1', 9092, 'mygroup-id'
async def consume(topic, group_id):
server_addr = "{}:{}".format(kafka_host, kafka_port)
consumer = AIOKafkaConsumer(
topic,
loop=asyncio.get_event_loop(), bootstrap_servers=server_addr,
group_id=group_id)
# Get cluster layout and join group `my-group`
await consumer.start()
try:
# Consume messages
async for msg in consumer:
memory_check()
finally:
# Will leave consumer group; perform autocommit if enabled.
await consumer.stop()
def memory_check():
# https://tech.gadventures.com/hunting-for-memory-leaks-in-asyncio-applications-3614182efaf7
if tracemalloc.is_tracing():
view_data = {}
snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.statistics('lineno')
view_data['top_10_trace'] = [str(x) for x in top_stats[:10]]
print(view_data)
if __name__ == '__main__':
topic = 'my_topic'
# asyncio.ensure_future(consume(topic, group_id=None))
asyncio.ensure_future(consume(topic, group_id='mygroup-id'))
'''
output aiokafka v0.4.1 group id None
{'top_10_trace': ['/usr/lib/python3.6/tracemalloc.py:65: size=99.8 KiB, count=1420, average=72 B', '/home/ubuntu/py3env/lib/python3.6/site-packages/kafka/protocol/types.py:124: size=7632 B, count=18, average=424 B', '/usr/lib/python3.6/asyncio/events.py:145: size=7104 B, count=21, average=338 B', '/home/ubuntu/py3env/lib/python3.6/site-packages/kafka/protocol/types.py:128: size=5648 B, count=12, average=471 B', '/home/ubuntu/py3env/lib/python3.6/site-packages/kafka/protocol/types.py:169: size=2928 B, count=15, average=195 B', '/home/ubuntu/py3env/lib/python3.6/site-packages/aiokafka/client.py:506: size=2848 B, count=38, average=75 B', '/home/ubuntu/py3env/lib/python3.6/site-packages/kafka/protocol/types.py:162: size=2784 B, count=6, average=464 B', '/usr/lib/python3.6/asyncio/tasks.py:313: size=2336 B, count=13, average=180 B', '/usr/lib/python3.6/asyncio/base_events.py:276: size=1776 B, count=17, average=104 B', '/home/ubuntu/py3env/lib/python3.6/site-packages/aiokafka/conn.py:138: size=1664 B, count=3, average=555 B']}
output aiokafka v0.4.1 group id mygroup-id
{'top_10_trace': ['/usr/lib/python3.6/tracemalloc.py:65: size=78.0 KiB, count=1109, average=72 B', '/usr/lib/python3.6/asyncio/base_events.py:558: size=10008 B, count=86, average=116 B', '/usr/lib/python3.6/asyncio/events.py:145: size=7952 B, count=32, average=248 B', '/home/ubuntu/py3env/lib/python3.6/site-packages/kafka/protocol/types.py:124: size=7552 B, count=17, average=444 B', '/home/ubuntu/py3env/lib/python3.6/site-packages/aiokafka/conn.py:219: size=4736 B, count=8, average=592 B', '/home/ubuntu/py3env/lib/python3.6/site-packages/kafka/protocol/types.py:169: size=4624 B, count=29, average=159 B', '/home/ubuntu/py3env/lib/python3.6/site-packages/kafka/protocol/types.py:128: size=4256 B, count=10, average=426 B', '/usr/lib/python3.6/asyncio/tasks.py:313: size=3488 B, count=24, average=145 B', '/home/ubuntu/py3env/lib/python3.6/site-packages/aiokafka/client.py:506: size=2920 B, count=39, average=75 B', '/usr/lib/python3.6/asyncio/base_events.py:276: size=2720 B, count=25, average=109 B']}
output aiokafka v0.4.2 group id None
{'top_10_trace': ['/usr/lib/python3.6/tracemalloc.py:65: size=90.0 KiB, count=1280, average=72 B', '/home/ubuntu/py3env/lib/python3.6/site-packages/kafka/protocol/types.py:136: size=15.6 KiB, count=126, average=127 B', '/home/ubuntu/py3env/lib/python3.6/site-packages/kafka/protocol/types.py:132: size=8112 B, count=19, average=427 B', '/home/ubuntu/py3env/lib/python3.6/site-packages/kafka/protocol/types.py:177: size=6960 B, count=78, average=89 B', '/usr/lib/python3.6/asyncio/events.py:145: size=6664 B, count=20, average=333 B', '/home/ubuntu/py3env/lib/python3.6/site-packages/kafka/protocol/types.py:170: size=3248 B, count=7, average=464 B', '/home/ubuntu/py3env/lib/python3.6/site-packages/aiokafka/client.py:509: size=2920 B, count=39, average=75 B', '/usr/lib/python3.6/asyncio/tasks.py:313: size=2336 B, count=13, average=180 B', '/usr/lib/python3.6/asyncio/base_events.py:276: size=1904 B, count=19, average=100 B', '/home/ubuntu/py3env/lib/python3.6/site-packages/aiokafka/conn.py:138: size=1664 B, count=3, average=555 B']}
output aiokafka v0.4.2 group id mygroup-id, notice tasks arise
{'top_10_trace': ['/usr/lib/python3.6/asyncio/tasks.py:688: size=123 KiB, count=927, average=136 B', '/usr/lib/python3.6/asyncio/base_events.py:276: size=104 KiB, count=953, average=112 B', '/usr/lib/python3.6/tracemalloc.py:498: size=55.2 KiB, count=883, average=64 B', '/home/ubuntu/py3env/lib/python3.6/site-packages/aiokafka/consumer/group_coordinator.py:277: size=43.5 KiB, count=927, average=48 B', '/usr/lib/python3.6/tracemalloc.py:65: size=34.8 KiB, count=495, average=72 B', '/usr/lib/python3.6/tracemalloc.py:180: size=23.6 KiB, count=432, average=56 B', '/usr/lib/python3.6/asyncio/events.py:145: size=7952 B, count=32, average=248 B', '/usr/lib/python3.6/asyncio/tasks.py:704: size=7920 B, count=1, average=7920 B', '/home/ubuntu/py3env/lib/python3.6/site-packages/kafka/protocol/types.py:132: size=7504 B, count=16, average=469 B', '/home/ubuntu/py3env/lib/python3.6/site-packages/kafka/protocol/types.py:177: size=4752 B, count=31, average=153 B']}
'''
asyncio.get_event_loop().run_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment