Skip to content

Instantly share code, notes, and snippets.

import concurrent.futures
from confluent_kafka import TopicPartition
consumer = DeserializingConsumer(consumer_conf)
consumer.subscribe("my_topic")
def process_message(msg, consumer, topic_partition):
if msg is not None and msg.value() is not None:
# assuming you have id as key in your message
id = msg.value().id
from threading import Timer
def callme(foo):
print(f"{foo}, I have been called")
t = Timer(10, callme, args=["hello"])
t.start()
print("Run code immediately here.")
t.join()
import asyncio
consumer = DeserializingConsumer(consumer_conf)
consumer.subscribe("my_topic")
async def batch_poll(consumer):
with concurrent.futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(consumer.poll)
awaitable = await asyncio.wrap_future(future)
return awaitable