Skip to content

Instantly share code, notes, and snippets.

Last active December 17, 2021 18:09
Show Gist options
  • Save mikesparr/f4575222c88d84de9dd12a943f03d0ea to your computer and use it in GitHub Desktop.
Save mikesparr/f4575222c88d84de9dd12a943f03d0ea to your computer and use it in GitHub Desktop.
Producer helper class using Confluent Kafka Python
#!/usr/bin/env python
# encoding: utf-8
Created by Michael Sparr on 2016-01-11.
Copyright (c) 2016 Goomzee Corporation. All rights reserved.
import sys
import os
import json
import uuid
import datetime
import time
import socket
import logging
import traceback
import confluent_kafka as kafka
"avro": "application/vnd.kafka.avro.v1+json",
"binary": "application/vnd.kafka.binary.v1+json",
"json": "application/vnd.kafka.json.v1+json"
def _printer(err, msg):
"""Test callback for producer"""
if not err:
print "Produced message and received: [{}]".format(msg.value().encode('utf-8'))
print "Received error sending message [{}] with msg [{}]".format(err, msg.value().encode('utf-8'))
class StreamProducer:
def __init__(self, cfg):
self.config = cfg
self.producer = None
# create logger
def __get_logger(self):
"""Instantiates logger."""
return logging.getLogger(os.path.basename(__file__))
def _get_connection(self):
"""Returns producer object"""
logger = self.__get_logger()
conf = {
'bootstrap.servers': ','.join(map(str, self.config.get('hosts'))),
'queue.buffering.max.messages': 500000,
'': 60000,
'batch.num.messages': 100,
'log.connection.close': False,
'': socket.gethostname(),
'default.topic.config': {'acks': 'all'}
producer = kafka.Producer(**conf)
self.producer = producer # set class level for reuse
except Exception, e:
logger.error( "Error establishing Kafka producer" )
logger.debug( traceback.format_exc() )
raise kafka.KafkaException(e)
def publish(self, data, topic='test', key=None, partition=None, callback=None):
"""POSTs data to Kafka REST proxy"""
logger = self.__get_logger()
if self.producer is None:
# end if producer
logger.debug( "Sending message to topic [{}] with key [{}]".format(topic, key) )
json.dumps(data, separators=(',', ':')).encode('utf-8'),
except BufferError, be:
logger.error( "BufferError publishing topic [{}]:[{}]-[{}]".format(topic, partition, key) )
except Exception, e:
logger.error( "Error publishing data at topic [{}]:[{}]-[{}]".format(topic, partition, key) )
logger.debug( traceback.format_exc() )
def main():
cnf = {
"hosts": ["localhost:9092"]
s = StreamProducer(cnf)
data = {"id": str(uuid.uuid1()), "some data": "some value", "some more data": "another value"}
s.publish(data, topic='test', key='12345', callback=_printer)
if __name__ == '__main__':
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment