Skip to content

Instantly share code, notes, and snippets.

@Richard-Mathie
Created October 31, 2018 16:28
Show Gist options
  • Save Richard-Mathie/c1202e23bb60c369b6e8e8f58f29527b to your computer and use it in GitHub Desktop.
Save Richard-Mathie/c1202e23bb60c369b6e8e8f58f29527b to your computer and use it in GitHub Desktop.
Kafka Consumer for Long Running Process
from threading import Thread, Event
from queue import Queue, Empty
from kafka import KafkaConsumer, KafkaProducer
from kafka.structs import TopicPartition, OffsetAndMetadata
from kafka.consumer.fetcher import ConsumerRecord
from kafka.errors import CommitFailedError
class KafkaWorker(object):
def __init__(self, topics, hosts, poll_timeout=1):
self.poll_timeout = poll_timeout
self.poll_timeout_ms = poll_timeout * 1000
self.topics = topics
self.hosts = hosts
self.queue = Queue() # type: Queue[ConsumerRecord]
self.commits = Queue()
self._stop_processing = Event()
self.connect_kafka()
self.worker_thread = Thread(target=self.processor)
self.worker_thread.start()
def process_record(self, record: ConsumerRecord):
pass
@property
def consumer_topics(self):
if isinstance(self.topics, list) or isinstance(self.topics, tuple):
return self.topics
return (self.topics,)
@property
def consumer_options(self):
return {}
@property
def producer_options(self):
return {}
def connect_kafka(self):
self.consumer = KafkaConsumer(*self.consumer_topics,
bootstrap_servers=self.hosts,
**self.consumer_options)
self.producer = KafkaProducer(bootstrap_servers=self.hosts,
**self.producer_options)
def consume(self):
while True:
topic_records = self.consumer.poll(timeout_ms=self.poll_timeout_ms)
for records in topic_records.values():
for record in records:
logging.debug(f'got message {record}')
self.queue.put(record)
offsets = {}
while not self.commits.empty():
record = self.commits.get()
topic_partition = TopicPartition(
record.topic,
record.partition
)
offset = OffsetAndMetadata(record.offset + 1, None)
offsets[topic_partition] = offset
self.consumer.commit(offsets)
def processor(self):
while not self._stop_processing.is_set():
try:
record = self.queue.get_nowait()
except Empty:
self._stop_processing.wait(timeout=self.poll_timeout)
else:
self.process_record(record)
self.commit(record)
self.queue.task_done()
def commit(self, record):
self.commits.put(record)
def stop(self):
self._stop_processing.set()
self.worker_thread.join()
import logging
import time
import sys
from kafka.consumer.fetcher import ConsumerRecord
from kafka_worker import KafkaWorker
logger = logging.getLogger(__name__)
hosts = "localhost:29092"
topic = "test_toppic"
def F(n):
"""Some Stupid Long Running Task"""
if n == 0:
return 0
elif n == 1:
return 1
else:
return F(n-1)+F(n-2)
class Worker(KafkaWorker):
count = 0
def process_record(self, record: ConsumerRecord):
self.count += 1
logger.debug(f'processing message {record}')
sys.stdout.write('.')
if self.count % 100 == 0:
sys.stdout.flush()
F(40)
@property
def consumer_topics(self):
return (topic,)
@property
def consumer_options(self):
return dict(
group_id='test1',
auto_offset_reset='earliest',
enable_auto_commit=False,
auto_commit_interval_ms=1000,
max_poll_interval_ms=2000
)
@property
def producer_options(self):
return dict(
compression_type='lz4',
key_serializer=lambda x: x.encode(),
value_serializer=lambda x: x.encode()
)
logging.basicConfig(level=logging.INFO)
try:
worker = Worker(topic, hosts)
for i in range(100):
worker.producer.send(topic, key=f'{i}', value=f'hello {i}')
worker.consume()
except KeyboardInterrupt:
logger.info("Got Keyboard Interrupt")
worker.stop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment