Skip to content

Instantly share code, notes, and snippets.

@ksingh7
Created October 1, 2021 21:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ksingh7/e43dfb4b5e0db8658705a342aa30dae1 to your computer and use it in GitHub Desktop.
Save ksingh7/e43dfb4b5e0db8658705a342aa30dae1 to your computer and use it in GitHub Desktop.
Kafka Python producer and consumer example that uses SSL and ca.crt files
from kafka import KafkaProducer, KafkaConsumer
import json
from bson import json_util
bootstrap_server = 'my-cluster-kafka-route-bootstrap-nestjs-testing.apps.ocp.ceph-s3.com:443'
print("Producing messages to Kafka topic ...")
producer = KafkaProducer(bootstrap_servers=bootstrap_server, ssl_cafile='ca.crt', security_protocol="SSL")
for i in range(10):
message = {'value': i}
producer.send('my-topic', json.dumps(message, default=json_util.default).encode('utf-8'))
print("Consuming messages from Kafka topic ...")
consumer = KafkaConsumer('my-topic', group_id='my-group', bootstrap_servers=bootstrap_server, ssl_cafile='ca.crt', security_protocol="SSL", consumer_timeout_ms=10000, enable_auto_commit=True)
for message in consumer:
# message value and key are raw bytes -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')`
print ("%s:%d:%d: value=%s" % (message.topic, message.partition,message.offset,message.value))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment