Skip to content

Instantly share code, notes, and snippets.

@satendrakumar
Created January 10, 2018 04:43
Show Gist options
  • Save satendrakumar/87a98ce71115579992091c93aca9e6c6 to your computer and use it in GitHub Desktop.
Save satendrakumar/87a98ce71115579992091c93aca9e6c6 to your computer and use it in GitHub Desktop.
# confluent kafka(.8.2) producer
import json
from confluent_kafka import Producer
class KafkaProducer:
def __init__(self, hosts):
self.hosts = hosts
self.producer = Producer(
{'bootstrap.servers': hosts, 'api.version.request': False, 'broker.version.fallback': '0.8.2'}
)
def send(self, topic, message):
print("sending to hosts" + self.hosts)
print("Sending message..... " + str(message))
self.producer.produce(topic, json.dumps(message).encode('utf-8'))
self.producer.flush()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment