Skip to content

Instantly share code, notes, and snippets.

@bholagabbar
Created May 20, 2020 02:40
Show Gist options
  • Save bholagabbar/1238aeec7004bb7eba8cfeac83b717aa to your computer and use it in GitHub Desktop.
Save bholagabbar/1238aeec7004bb7eba8cfeac83b717aa to your computer and use it in GitHub Desktop.
Read from Kafka
from kafka import KafkaConsumer
from json import loads
# Create a consumer to read data from kafka
consumer = KafkaConsumer(
'numtest',
bootstrap_servers=['localhost:9092'],
# Read from the start of the topic; Default is latest
auto_offset_reset='earliest'
)
# Prints all messages, again and again!
for message in consumer:
# Default message.value type is bytes!
print(loads(message.value))
consumer = KafkaConsumer(
'numtest',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
# Consumer group id
group_id='numtest-group-<andrew_id>',
# Commit that an offset has been read
enable_auto_commit=True,
# How often to tell Kafka, an offset has been read
auto_commit_interval_ms=1000
)
# Prints messages once, then only new ones!
for message in consumer:
print(loads(message.value))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment