Instantly share code, notes, and snippets.

Embed
What would you like to do?
methods to connect Kafa Producer and publishing data.
def publish_message(producer_instance, topic_name, key, value):
try:
key_bytes = bytes(key, encoding='utf-8')
value_bytes = bytes(value, encoding='utf-8')
producer_instance.send(topic_name, key=key_bytes, value=value_bytes)
producer_instance.flush()
print('Message published successfully.')
except Exception as ex:
print('Exception in publishing message')
print(str(ex))
def connect_kafka_producer():
_producer = None
try:
_producer = KafkaProducer(bootstrap_servers=['localhost:9092'], api_version=(0, 10))
except Exception as ex:
print('Exception while connecting Kafka')
print(str(ex))
finally:
return _producer
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment