Skip to content

Instantly share code, notes, and snippets.

@gtato
Last active August 25, 2021 11:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gtato/14dea777664239e78ded9b3b82a1d1e6 to your computer and use it in GitHub Desktop.
Save gtato/14dea777664239e78ded9b3b82a1d1e6 to your computer and use it in GitHub Desktop.
Pause and resume kafka
import queue
from confluent_kafka import Producer, Consumer, TopicPartition, OFFSET_END, OFFSET_BEGINNING
from multiprocessing import Queue
from threading import Thread
import time
import string
import random
def get_topics(nr=5):
return [f'topic.{i}' for i in range(nr)]
def get_producer(brokers):
producer = Producer({'bootstrap.servers': brokers})
return producer
def get_consumer(brokers, group):
consumer = Consumer({
'bootstrap.servers': brokers,
'group.id': group,
'delivery.report.only.error': True,
'enable.auto.commit': True,
'auto.offset.reset': 'latest',
# 'max.poll.interval.ms': 10000
})
return consumer
# start producing on a randomly in one of the predefined topics
def produce_thread(stop):
brokers = 'localhost:9092'
p = get_producer(brokers)
topics = get_topics()
indexes = {t: 0 for t in topics}
key = ''.join(random.choices(string.ascii_uppercase + string.digits, k=5))
print(f'producer key: {key}')
while True:
if stop():
break
topic = random.choice(topics)
# print(f'producer producing on {topic}')
p.produce(topic, f'{key}-{indexes[topic]} on topic {topic}')
indexes[topic] += 1
time.sleep(1)
# picks randomly if to subscribe or unsubscribe to a topic
def topic_selector(queue: Queue, stop, timer=5):
topics = get_topics()
actions = ['subscribe', 'unsubscribe']
while True:
if stop():
break
topic = random.choice(topics)
action = random.choice(actions)
topic_action = {'action': action, 'topic': topic}
# print(f'putting {topic_action}')
queue.put_nowait(topic_action)
time.sleep(timer)
def kafka_test():
brokers = 'localhost:9092'
c = get_consumer(brokers, '111')
topic_queue = Queue()
stop_producer = False
producer_thread = Thread(target=produce_thread, args=[lambda: stop_producer])
producer_thread.start()
topic_selector_thread = Thread(target=topic_selector, args=[topic_queue, lambda: stop_producer, 10])
topic_selector_thread.start()
i = 0
nr_topics_subscribed = 0
# the consumer gets directions from the topic selector and subscribes or unsubscribes to specific topics
while True:
try:
topic_action = topic_queue.get_nowait()
assignment, changed = handle_subscription(c, topic_action)
if changed:
print(f'applied {topic_action}')
nr_topics_subscribed = len(assignment)
except queue.Empty:
pass
if nr_topics_subscribed == 0:
print('not subscribed to any topics, going to sleep')
time.sleep(1)
continue
msg = c.poll(-1.0)
if msg:
pos = c.position(c.assignment())
print(f'got {msg.value()} ... offset {pos[0].offset}')
if i == 30:
break
i += 1
stop_producer = True
producer_thread.join()
topic_selector_thread.join()
def handle_subscription(consumer: Consumer, topic_action):
changed = False
assignment = consumer.assignment()
assigned_topics = [assig.topic for assig in assignment]
subscribed = topic_action['topic'] in assigned_topics
subscribe_action = topic_action['action'] == 'subscribe'
if not subscribed and subscribe_action:
topic_partition = TopicPartition(topic=topic_action['topic'], partition=0, offset=OFFSET_END)
assignment.append(topic_partition)
changed = True
if subscribed and not subscribe_action:
assignment.pop(assigned_topics.index(topic_action['topic']))
changed = True
if changed:
consumer.assign(assignment)
return assignment, changed
if __name__ == '__main__':
kafka_test()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment