Skip to content

Instantly share code, notes, and snippets.

@pvsune
Last active February 24, 2024 15:20
Show Gist options
  • Star 10 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save pvsune/62c0556eff54087927334e542589ddcd to your computer and use it in GitHub Desktop.
Save pvsune/62c0556eff54087927334e542589ddcd to your computer and use it in GitHub Desktop.
A multiprocess multithreaded Kafka consumer
#!/usr/bin/env python
import logging
import os
import threading
import time
from multiprocessing import Process
from queue import Queue
from confluent_kafka import Consumer
def _process_msg(q, c):
msg = q.get(timeout=60) # Set timeout to care for POSIX<3.0 and Windows.
logging.info(
'#%sT%s - Received message: %s',
os.getpid(), threading.get_ident(), msg.value().decode('utf-8')
)
time.sleep(5)
q.task_done()
c.commit(msg)
def _consume(config):
logging.info(
'#%s - Starting consumer group=%s, topic=%s',
os.getpid(), config['kafka_kwargs']['group.id'], config['topic'],
)
c = Consumer(**config['kafka_kwargs'])
c.subscribe([config['topic']])
q = Queue(maxsize=config['num_threads'])
while True:
logging.info('#%s - Waiting for message...', os.getpid())
try:
msg = c.poll(60)
if msg is None:
continue
if msg.error():
logging.error(
'#%s - Consumer error: %s', os.getpid(), msg.error()
)
continue
q.put(msg)
# Use default daemon=False to stop threads gracefully in order to
# release resources properly.
t = threading.Thread(target=_process_msg, args=(q, c))
t.start()
except Exception:
logging.exception('#%s - Worker terminated.', os.getpid())
c.close()
def main(config):
"""
Simple program that consumes messages from Kafka topic and prints to
STDOUT.
"""
workers = []
while True:
num_alive = len([w for w in workers if w.is_alive()])
if config['num_workers'] == num_alive:
continue
for _ in range(config['num_workers']-num_alive):
p = Process(target=_consume, daemon=True, args=(config,))
p.start()
workers.append(p)
logging.info('Starting worker #%s', p.pid)
if __name__ == '__main__':
logging.basicConfig(
level=getattr(logging, os.getenv('LOGLEVEL', '').upper(), 'INFO'),
format='[%(asctime)s] %(levelname)s:%(name)s:%(message)s',
)
main(config={
# At most, this should be the total number of Kafka partitions on
# the topic.
'num_workers': 4,
'num_threads': 4,
'topic': 'my_topic_name',
'kafka_kwargs': {
'bootstrap.servers': ','.join([
'cluster1.mykafka.com',
'cluster2.mykafka.com',
'cluster3.mykafka.com',
]),
'group.id': 'my_consumer_group',
'auto.offset.reset': 'earliest',
# Commit manually to care for abrupt shutdown.
'enable.auto.commit': False,
},
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment