Skip to content

Instantly share code, notes, and snippets.

@merlin-quix
Created January 22, 2023 13:27
Show Gist options
  • Save merlin-quix/07ed46aacc3c467c848ae68df7b241ed to your computer and use it in GitHub Desktop.
Save merlin-quix/07ed46aacc3c467c848ae68df7b241ed to your computer and use it in GitHub Desktop.
#1 — Initialize a Consumer for reading the email messages from the emails topic
consumer = KafkaConsumer(group_id="python-consumer",
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',# Always read from beginning
enable_auto_commit=False, # Keep Kafka messages as ‘unread’
value_deserializer=lambda x:json.loads(x))
consumer.subscribe("emails")
print(f'Initialized Kafka consumer at {dt.datetime.utcnow()})
#2 — Initialize a Producer for sending predictions to the predictions topic
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda x:
json.dumps(x,default=str).encode('utf-8'))
print(f'Initialized Kafka producer at {dt.datetime.utcnow()})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment