Instantly share code, notes, and snippets.

Embed
What would you like to do?
Consumer that notifies if calories count cross the threshold
import json
from time import sleep
from kafka import KafkaConsumer
if __name__ == '__main__':
parsed_topic_name = 'parsed_recipes'
# Notify if a recipe has more than 200 calories
calories_threshold = 200
consumer = KafkaConsumer(parsed_topic_name, auto_offset_reset='earliest',
bootstrap_servers=['localhost:9092'], api_version=(0, 10), consumer_timeout_ms=1000)
for msg in consumer:
record = json.loads(msg.value)
calories = int(record['calories'])
title = record['title']
if calories > calories_threshold:
print('Alert: {} calories count is {}'.format(title, calories))
sleep(3)
if consumer is not None:
consumer.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment