Skip to content

Instantly share code, notes, and snippets.

@mze3e
Created April 14, 2021 13:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mze3e/dd63ff6701db3581c296163c6be45197 to your computer and use it in GitHub Desktop.
Save mze3e/dd63ff6701db3581c296163c6be45197 to your computer and use it in GitHub Desktop.
Python Kafka Producer Examples
#Example 1
#Project: dino Author: thenetcircle File: kafka.py License: Apache License 2.0
def __init__(self, env, is_external_queue: bool):
super().__init__(env, is_external_queue, queue_type='kafka', logger=logger)
eq_host = env.config.get(ConfigKeys.HOST, domain=self.domain_key, default=None)
eq_queue = env.config.get(ConfigKeys.QUEUE, domain=self.domain_key, default=None)
if eq_host is None or len(eq_host) == 0 or (type(eq_host) == str and len(eq_host.strip()) == 0):
logging.warning('blank external host specified, not setting up external publishing')
return
if eq_queue is None or len(eq_queue.strip()) == 0:
logging.warning('blank external queue specified, not setting up external publishing')
return
if type(eq_host) == str:
eq_host = [eq_host]
from kafka import KafkaProducer
import json
self.queue = eq_queue
self.queue_connection = KafkaProducer(
bootstrap_servers=eq_host,
value_serializer=lambda v: json.dumps(v).encode('utf-8'))
logger.info('setting up pubsub for type "{}: and host(s) "{}"'.format(self.queue_type, ','.join(eq_host)))
#Example 2
#Project: dino Author: thenetcircle File: kafka.py License: Apache License 2.0
def try_publish(self, message):
if self.env.enrichment_manager is not None:
message = self.env.enrichment_manager.handle(message)
topic_key = None
# try to get some consistency
try:
target = message.get('target', dict())
topic_key = target.get('id', None)
if topic_key is None:
actor = message.get('actor', dict())
topic_key = actor.get('id', None)
# kafka publisher can't handle string keys
topic_key = bytes(str(topic_key), encoding='utf-8')
except Exception as partition_e:
logger.exception(traceback.format_exc())
environ.env.capture_exception(partition_e)
# for kafka, the queue_connection is the KafkaProducer and queue is the topic name
self.queue_connection.send(
topic=self.queue, value=message, key=topic_key)
#Example 3
#Project: scrapy-cluster Author: istresearch File: rest_service.py License: MIT License
def _create_producer(self):
"""Tries to establish a Kafka consumer connection"""
if not self.closed:
try:
self.logger.debug("Creating new kafka producer using brokers: " +
str(self.settings['KAFKA_HOSTS']))
return KafkaProducer(bootstrap_servers=self.settings['KAFKA_HOSTS'],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
retries=3,
linger_ms=self.settings['KAFKA_PRODUCER_BATCH_LINGER_MS'],
buffer_memory=self.settings['KAFKA_PRODUCER_BUFFER_BYTES'])
except KeyError as e:
self.logger.error('Missing setting named ' + str(e),
{'ex': traceback.format_exc()})
except:
self.logger.error("Couldn't initialize kafka producer.",
{'ex': traceback.format_exc()})
raise
#Example 4
#Project: scrapy-cluster Author: istresearch File: kafka_monitor.py License: MIT License
def _create_producer(self):
"""Tries to establish a Kafka consumer connection"""
try:
brokers = self.settings['KAFKA_HOSTS']
self.logger.debug("Creating new kafka producer using brokers: " +
str(brokers))
return KafkaProducer(bootstrap_servers=brokers,
value_serializer=lambda m: json.dumps(m),
retries=3,
linger_ms=self.settings['KAFKA_PRODUCER_BATCH_LINGER_MS'],
buffer_memory=self.settings['KAFKA_PRODUCER_BUFFER_BYTES'])
except KeyError as e:
self.logger.error('Missing setting named ' + str(e),
{'ex': traceback.format_exc()})
except:
self.logger.error("Couldn't initialize kafka producer.",
{'ex': traceback.format_exc()})
raise
#Example 5
#Project: scrapy-cluster Author: istresearch File: kafka_base_monitor.py License: MIT License
def _create_producer(self, settings):
"""Tries to establish a Kafka consumer connection"""
try:
brokers = settings['KAFKA_HOSTS']
self.logger.debug("Creating new kafka producer using brokers: " +
str(brokers))
return KafkaProducer(bootstrap_servers=brokers,
value_serializer=lambda m: json.dumps(m),
retries=3,
linger_ms=settings['KAFKA_PRODUCER_BATCH_LINGER_MS'],
buffer_memory=settings['KAFKA_PRODUCER_BUFFER_BYTES'])
except KeyError as e:
self.logger.error('Missing setting named ' + str(e),
{'ex': traceback.format_exc()})
except:
self.logger.error("Couldn't initialize kafka producer in plugin.",
{'ex': traceback.format_exc()})
raise
#Example 6
#Project: rasa_core Author: RasaHQ File: broker.py License: Apache License 2.0
def _create_producer(self):
import kafka
if self.security_protocol == 'SASL_PLAINTEXT':
self.producer = kafka.KafkaProducer(
bootstrap_servers=[self.host],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
sasl_plain_username=self.sasl_username,
sasl_plain_password=self.sasl_password,
sasl_mechanism='PLAIN',
security_protocol=self.security_protocol)
elif self.security_protocol == 'SSL':
self.producer = kafka.KafkaProducer(
bootstrap_servers=[self.host],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
ssl_cafile=self.ssl_cafile,
ssl_certfile=self.ssl_certfile,
ssl_keyfile=self.ssl_keyfile,
ssl_check_hostname=False,
security_protocol=self.security_protocol)
#Example 7
#Project: webhook-shims Author: vmw-loginsight File: kafkatopic.py License: Apache License 2.0
def kafka(TOPIC=None):
# Lazy init of the Kafka producer
#
global PRODUCER
if PRODUCER is None:
PRODUCER = KafkaProducer(
bootstrap_servers=KAFKA_BOOSTRAP_SERVERS,
sasl_mechanism=KAFKA_SASL_MECHANISM,
sasl_plain_username=KAFKA_USER,
sasl_plain_password=KAFKA_PASSWORD)
try:
future = PRODUCER.send(TOPIC, request.get_data())
future.get(timeout=60)
return "OK", 200, None
except KafkaTimeoutError:
return "Internal Server Error", 500, None
#Example 8
#Project: ozymandias Author: pambot File: ozy_producer.py License: MIT License
def main(n):
"""Stream the video into a Kafka producer in an infinite loop"""
topic = choose_channel(n)
video_reader = imageio.get_reader(DATA + topic + '.mp4', 'ffmpeg')
metadata = video_reader.get_meta_data()
fps = metadata['fps']
producer = KafkaProducer(bootstrap_servers='localhost:9092',
batch_size=15728640,
linger_ms=1000,
max_request_size=15728640,
value_serializer=lambda v: json.dumps(v.tolist()))
while True:
video_loop(video_reader, producer, topic, fps)
#Example 9
#Project: rasa-for-botfront Author: botfront File: kafka.py License: Apache License 2.0
def _create_producer(self) -> None:
import kafka
if self.security_protocol == "SASL_PLAINTEXT":
self.producer = kafka.KafkaProducer(
bootstrap_servers=[self.host],
value_serializer=lambda v: json.dumps(v).encode(DEFAULT_ENCODING),
sasl_plain_username=self.sasl_username,
sasl_plain_password=self.sasl_password,
sasl_mechanism="PLAIN",
security_protocol=self.security_protocol,
)
elif self.security_protocol == "SSL":
self.producer = kafka.KafkaProducer(
bootstrap_servers=[self.host],
value_serializer=lambda v: json.dumps(v).encode(DEFAULT_ENCODING),
ssl_cafile=self.ssl_cafile,
ssl_certfile=self.ssl_certfile,
ssl_keyfile=self.ssl_keyfile,
ssl_check_hostname=False,
security_protocol=self.security_protocol,
)
#Example 10
#Project: ColumbiaImageSearch Author: ColumbiaDVMM File: kafka_pusher.py License: Apache License 2.0
def init_producer(self):
"""Initialize KafkaProducer
"""
print("[{}: log] Initializing producer...".format(self.pp))
# Gather optional parameters
dict_args = dict()
#dict_args = self.get_servers(dict_args, 'producer_servers')
#dict_args = self.get_security(dict_args, 'producer_security')
dict_args = self.get_servers(dict_args, 'servers')
dict_args = self.get_security(dict_args, 'security')
# Instantiate producer
try:
self.producer = KafkaProducer(**dict_args)
except Exception as inst:
msg = "[{}: ERROR] Could not initialize producer with arguments {}. Error was: {}"
raise RuntimeError(msg.format(self.pp, dict_args, inst))
self.topic_name = self.get_required_param("topic_name")
#Example 11
#Project: karapace Author: aiven File: karapace.py License: Apache License 2.0
def _create_producer(self):
while True:
try:
return KafkaProducer(
bootstrap_servers=self.config["bootstrap_uri"],
security_protocol=self.config["security_protocol"],
ssl_cafile=self.config["ssl_cafile"],
ssl_certfile=self.config["ssl_certfile"],
ssl_keyfile=self.config["ssl_keyfile"],
api_version=(1, 0, 0),
metadata_max_age_ms=self.config["metadata_max_age_ms"],
max_block_ms=2000 # missing topics will block unless we cache cluster metadata and pre-check
)
except: # pylint: disable=bare-except
self.log.exception("Unable to create producer, retrying")
time.sleep(1)
#Example 12
#Project: pyspark-twitter-stream-mining Author: amir-rahnama File: twitter_stream.py License: MIT License
def __init__(self):
self.producer = KafkaProducer(bootstrap_servers='docker:9092', value_serializer=lambda v: json.dumps(v))
self.tweets = []
#Example 13
#Project: scrapy-cluster Author: istresearch File: pipelines.py License: MIT License
def from_settings(cls, settings):
my_level = settings.get('SC_LOG_LEVEL', 'INFO')
my_name = settings.get('SC_LOGGER_NAME', 'sc-logger')
my_output = settings.get('SC_LOG_STDOUT', True)
my_json = settings.get('SC_LOG_JSON', False)
my_dir = settings.get('SC_LOG_DIR', 'logs')
my_bytes = settings.get('SC_LOG_MAX_BYTES', '10MB')
my_file = settings.get('SC_LOG_FILE', 'main.log')
my_backups = settings.get('SC_LOG_BACKUPS', 5)
my_appids = settings.get('KAFKA_APPID_TOPICS', False)
logger = LogFactory.get_instance(json=my_json,
name=my_name,
stdout=my_output,
level=my_level,
dir=my_dir,
file=my_file,
bytes=my_bytes,
backups=my_backups)
try:
producer = KafkaProducer(bootstrap_servers=settings['KAFKA_HOSTS'],
retries=3,
linger_ms=settings['KAFKA_PRODUCER_BATCH_LINGER_MS'],
buffer_memory=settings['KAFKA_PRODUCER_BUFFER_BYTES'])
except Exception as e:
logger.error("Unable to connect to Kafka in Pipeline"\
", raising exit flag.")
# this is critical so we choose to exit.
# exiting because this is a different thread from the crawlers
# and we want to ensure we can connect to Kafka when we boot
sys.exit(1)
topic_prefix = settings['KAFKA_TOPIC_PREFIX']
use_base64 = settings['KAFKA_BASE_64_ENCODE']
return cls(producer, topic_prefix, logger, appids=my_appids,
use_base64=use_base64)
#Example 14
#Project: orion-server Author: LINKIWI File: stream.py License: MIT License
def __init__(self, kafka_addr, kafka_topic):
"""
Client for producing location messages to a Kafka broker.
:param kafka_addr: Address to the Kafka broker.
:param kafka_topic: Name of the Kafka topic to which messages should be published.
"""
# Bypass event publishing entirely when no broker address is specified.
producer_factory = (kafka_addr and kafka.KafkaProducer) or NoopProducer
self.topic = kafka_topic
self.producer = producer_factory(
bootstrap_servers=kafka_addr,
value_serializer=json.dumps,
)
#Example 15
#Project: network-programmability-stream Author: dmfigol File: nc_dial_in_subscribe.py License: MIT License
def kafka_connect(self):
self.producer = KafkaProducer(
bootstrap_servers=KAFKA_URL,
value_serializer=lambda v: json.dumps(v).encode('utf-8'))
#Example 16
#Project: sniffer Author: threathunterX File: sniffer_nebula_test.py License: Apache License 2.0
def __init__(self, bootstrap_servers, kafkatopic):
self.kafkatopic = kafkatopic
self.producer = KafkaProducer(bootstrap_servers=bootstrap_servers)
#Example 17
#Project: search-MjoLniR Author: wikimedia File: client.py License: MIT License
def _make_producer(client_config):
return kafka.KafkaProducer(bootstrap_servers=client_config.brokers,
compression_type='gzip')
#Example 18
#Project: monasca-analytics Author: openstack File: kafkas.py License: Apache License 2.0
def sink_dstream(self, dstream):
if self._producer is None:
self._producer = kafka.KafkaProducer(
bootstrap_servers="{0}:{1}".format(self._host, self._port))
dstream.foreachRDD(self._persist)
#Example 19
#Project: Gather-Deployment Author: huseinzol05 File: producer.py License: MIT License
def connect_kafka_producer():
print('connecting to kafka')
_producer = None
try:
_producer = KafkaProducer(
bootstrap_servers = ['localhost:9092'], api_version = (0, 10)
)
except Exception as ex:
print('Exception while connecting Kafka')
print(str(ex))
finally:
print('successfully connected to kafka')
return _producer
#Example 20
#Project: Gather-Deployment Author: huseinzol05 File: main.py License: MIT License
def connect_kafka_producer():
print('connecting to kafka')
_producer = None
try:
_producer = KafkaProducer(
bootstrap_servers = ['kafka:9092'],
api_version = (0, 10),
partitioner = RoundRobinPartitioner(),
)
except Exception as ex:
print('Exception while connecting Kafka')
print(str(ex))
finally:
print('successfully connected to kafka')
return _producer
#Example 21
#Project: ztag Author: zmap File: stream.py License: Apache License 2.0
def __init__(self, logger=None, destination=None, *args, **kwargs):
from kafka import KafkaProducer
if destination == "full_ipv4":
self.topic = "ipv4"
elif destination == "alexa_top1mil":
self.topic = "domain"
else:
raise Exception("invalid destination: %s" % destination)
host = os.environ.get('KAFKA_BOOTSTRAP_HOST', 'localhost:9092')
self.main_producer = KafkaProducer(bootstrap_servers=host)
self.cert_producer = KafkaProducer(bootstrap_servers=host)
#Example 22
#Project: distributed_framework Author: ydf0509 File: kafka_publisher.py License: Apache License 2.0
def custom_init(self):
self._producer = KafkaProducer(bootstrap_servers=frame_config.KAFKA_BOOTSTRAP_SERVERS)
try:
admin_client = KafkaAdminClient(bootstrap_servers=frame_config.KAFKA_BOOTSTRAP_SERVERS)
admin_client.create_topics([NewTopic(self._queue_name, 16, 1)])
# admin_client.create_partitions({self._queue_name: NewPartitions(total_count=16)})
except TopicAlreadyExistsError:
pass
except Exception as e:
self.logger.exception(e)
atexit.register(self.close) # 程序退出前不主动关闭,会报错。
#Example 23
#Project: distributed_framework Author: ydf0509 File: log_manager000.py License: Apache License 2.0
def __init__(self, bootstrap_servers, **configs):
"""
:param elastic_hosts: es的ip地址,数组类型
:param elastic_port: es端口
:param index_prefix: index名字前缀。
"""
logging.Handler.__init__(self)
if not self.__class__.kafka_producer:
very_nb_print('实例化kafka producer')
self.__class__.kafka_producer = KafkaProducer(bootstrap_servers=bootstrap_servers, **configs)
t = Thread(target=self._do_bulk_op)
t.setDaemon(True)
t.start()
#Example 24
#Project: distributed_framework Author: ydf0509 File: kafka_consumer.py License: Apache License 2.0
def _shedual_task(self):
self._producer = KafkaProducer(bootstrap_servers=frame_config.KAFKA_BOOTSTRAP_SERVERS)
consumer = OfficialKafkaConsumer(self._queue_name, bootstrap_servers=frame_config.KAFKA_BOOTSTRAP_SERVERS,
group_id='frame_group', enable_auto_commit=True)
# REMIND 由于是很高数量的并发消费,线程很多,分区很少,这里设置成自动确认消费了,否则多线程提交同一个分区的偏移量导致超前错乱,就没有意义了。
# REMIND 要保证很高的可靠性和一致性,请用rabbitmq。
# REMIND 好处是并发高。topic像翻书一样,随时可以设置偏移量重新消费。多个分组消费同一个主题,每个分组对相同主题的偏移量互不干扰。
for message in consumer:
# 注意: message ,value都是原始的字节数据,需要decode
self.logger.debug(
f'从kafka的 [{message.topic}] 主题,分区 {message.partition} 中 取出的消息是: {message.value.decode()}')
kw = {'consumer': consumer, 'message': message, 'body': json.loads(message.value)}
self._submit_task(kw)
#Example 25
#Project: parsedmarc Author: domainaware File: kafkaclient.py License: Apache License 2.0
def __init__(self, kafka_hosts, ssl=False, username=None,
password=None, ssl_context=None):
"""
Initializes the Kafka client
Args:
kafka_hosts (list): A list of Kafka hostnames
(with optional port numbers)
ssl (bool): Use a SSL/TLS connection
username (str): An optional username
password (str): An optional password
ssl_context: SSL context options
Notes:
``use_ssl=True`` is implied when a username or password are
supplied.
When using Azure Event Hubs, the username is literally
``$ConnectionString``, and the password is the
Azure Event Hub connection string.
"""
config = dict(value_serializer=lambda v: json.dumps(v).encode(
'utf-8'),
bootstrap_servers=kafka_hosts,
client_id="parsedmarc-{0}".format(__version__))
if ssl or username or password:
config["security_protocol"] = "SSL"
config["ssl_context"] = ssl_context or create_default_context()
if username or password:
config["sasl_plain_username"] = username or ""
config["sasl_plain_password"] = password or ""
try:
self.producer = KafkaProducer(**config)
except NoBrokersAvailable:
raise KafkaError("No Kafka brokers available")
#Example 26
#Project: rasa_core Author: RasaHQ File: broker.py License: Apache License 2.0
def from_endpoint_config(cls, broker_config) -> Optional['KafkaProducer']:
if broker_config is None:
return None
return cls(broker_config.url, **broker_config.kwargs)
#Example 27
#Project: rafiki Author: nginyc File: inference_cache.py License: Apache License 2.0
def __init__(self, hosts=os.environ.get('KAFKA_HOST', 'localhost'), ports=os.environ.get('KAFKA_PORT', 9092)):
hostlist = hosts.split(',')
portlist = ports.split(',')
self.connection_url = [f'{host}:{port}' for host, port in zip(hostlist, portlist)]
self.producer = KafkaProducer(bootstrap_servers=self.connection_url)
#Example 28
#Project: platypush Author: BlackLight File: kafka.py License: MIT License
def send_message(self, msg, topic, server=None, **kwargs):
"""
:param msg: Message to send - as a string, bytes stream, JSON, Platypush message, dictionary, or anything that implements ``__str__``
:param server: Kafka server name or address + port (format: ``host:port``). If None, then the default server will be used
:type server: str
"""
from kafka import KafkaProducer
if not server:
if not self.server:
try:
kafka_backend = get_backend('kafka')
server = kafka_backend.server
except:
raise RuntimeError('No Kafka server nor default server specified')
else:
server = self.server
if isinstance(msg, dict) or isinstance(msg, list):
msg = json.dumps(msg)
msg = str(msg).encode('utf-8')
producer = KafkaProducer(bootstrap_servers=server)
producer.send(topic, msg)
producer.flush()
# vim:sw=4:ts=4:et:
#Example 29
#Project: py-timeexecution Author: kpn-digital File: kafka.py License: Apache License 2.0
def producer(self):
"""
:raises: kafka.errors.NoBrokersAvailable if the connection is broken
"""
if self._producer:
return self._producer
self._producer = KafkaProducer(
bootstrap_servers=self.hosts,
value_serializer=lambda v: self._serializer_class().dumps(v).encode('utf-8'),
**self._kwargs
)
return self._producer
#Example 30
#Project: stoq-plugins-public Author: PUNCH-Cyber File: kafka-queue.py License: Apache License 2.0
def _connect(self) -> None:
"""
Connect to Kafka to publish a message
"""
if not self.producer:
self.producer = KafkaProducer(
bootstrap_servers=self.servers, retries=self.retries
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment