Skip to content

Instantly share code, notes, and snippets.

@wengkham
Last active November 4, 2023 00:24
Show Gist options
  • Save wengkham/4ed48fb92d3f78804ca609ad8c89b3f8 to your computer and use it in GitHub Desktop.
Save wengkham/4ed48fb92d3f78804ca609ad8c89b3f8 to your computer and use it in GitHub Desktop.
import concurrent.futures
from confluent_kafka import TopicPartition
consumer = DeserializingConsumer(consumer_conf)
consumer.subscribe("my_topic")
def process_message(msg, consumer, topic_partition):
if msg is not None and msg.value() is not None:
# assuming you have id as key in your message
id = msg.value().id
# do your thing here
print(id)
# resume consuming partition
consumer.resume(topic_partition)
# commit the message once done
consumer.commit(msg)
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
while True:
msg = consumer.poll()
# create a TopicPartiton object
topic_partition = [TopicPartition(msg.topic(), msg.partition(), msg.offset())]
# pause consuming this partition
consumer.pause(topic_partition)
executor.submit(process_message, msg, consumer, topic_partition)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment