Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
from confluent_kafka import Producer
class SimpleProducer:
def __init__(self, brokers: str):
config = {'bootstrap.servers': brokers, 'queue.buffering.max.ms': 1,
'message.send.max.retries': 5, 'retry.backoff.ms': 200,
'message.max.bytes': 104857600,
'default.topic.config': {'request.required.acks': 'all'}}
self.producer = Producer(config)
def send(self, topic: str, msg: bytes) -> None:
"""
Send a message to the kafka topic. The message is encoded to byte form before sending. The connection to kafka
will be closed after the data is sent.
:param topic: The name of the kafka topic to put the message on.
:param msg: The data to put on the kafka topic. Assumes data is UTF-8
:param headers: Kafka message headers
"""
self.producer.poll(0)
self.producer.produce(topic, msg, callback=self.callback_handler)
self.producer.flush()
def callback_handler(self, err, msg) -> None:
if err is not None:
print(f'Failed to deliver kafka message: {err}')
else:
print(f'Delivered kafka message to topic {msg.topic()} and partition {msg.partition()}')
if __name__ == '__main__':
producer = SimpleProducer('localhost:29092')
producer.send('jon_test_topic', b'how much wood could a wood chuck chuck100')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment