Skip to content

Instantly share code, notes, and snippets.

@mikesparr
Last active December 17, 2021 18:09
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mikesparr/f4575222c88d84de9dd12a943f03d0ea to your computer and use it in GitHub Desktop.
Save mikesparr/f4575222c88d84de9dd12a943f03d0ea to your computer and use it in GitHub Desktop.
Producer helper class using Confluent Kafka Python
#!/usr/bin/env python
# encoding: utf-8
"""
stream_producer.py
Created by Michael Sparr on 2016-01-11.
Copyright (c) 2016 Goomzee Corporation. All rights reserved.
"""
import sys
import os
import json
import uuid
import datetime
import time
import socket
import logging
import traceback
import confluent_kafka as kafka
HEADERS = {
"avro": "application/vnd.kafka.avro.v1+json",
"binary": "application/vnd.kafka.binary.v1+json",
"json": "application/vnd.kafka.json.v1+json"
}
def _printer(err, msg):
"""Test callback for producer"""
print
if not err:
print "Produced message and received: [{}]".format(msg.value().encode('utf-8'))
else:
print "Received error sending message [{}] with msg [{}]".format(err, msg.value().encode('utf-8'))
class StreamProducer:
def __init__(self, cfg):
self.config = cfg
self.producer = None
# create logger
def __get_logger(self):
"""Instantiates logger."""
return logging.getLogger(os.path.basename(__file__))
def _get_connection(self):
"""Returns producer object"""
logger = self.__get_logger()
try:
conf = {
'bootstrap.servers': ','.join(map(str, self.config.get('hosts'))),
'queue.buffering.max.messages': 500000,
'queue.buffering.max.ms': 60000,
'batch.num.messages': 100,
'log.connection.close': False,
'client.id': socket.gethostname(),
'default.topic.config': {'acks': 'all'}
}
producer = kafka.Producer(**conf)
self.producer = producer # set class level for reuse
except Exception, e:
logger.error( "Error establishing Kafka producer" )
logger.debug( traceback.format_exc() )
raise kafka.KafkaException(e)
def publish(self, data, topic='test', key=None, partition=None, callback=None):
"""POSTs data to Kafka REST proxy"""
logger = self.__get_logger()
try:
if self.producer is None:
self._get_connection()
# end if producer
logger.debug( "Sending message to topic [{}] with key [{}]".format(topic, key) )
self.producer.produce(
topic,
json.dumps(data, separators=(',', ':')).encode('utf-8'),
key=key,
on_delivery=callback
)
#self.producer.flush()
except BufferError, be:
logger.error( "BufferError publishing topic [{}]:[{}]-[{}]".format(topic, partition, key) )
except Exception, e:
logger.error( "Error publishing data at topic [{}]:[{}]-[{}]".format(topic, partition, key) )
logger.debug( traceback.format_exc() )
def main():
logging.basicConfig(level=logging.DEBUG)
cnf = {
"hosts": ["localhost:9092"]
}
s = StreamProducer(cnf)
data = {"id": str(uuid.uuid1()), "some data": "some value", "some more data": "another value"}
s.publish(data, topic='test', key='12345', callback=_printer)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment