Skip to content

Instantly share code, notes, and snippets.

@danthelion
Created October 20, 2022 11:05
Show Gist options
  • Save danthelion/6c3b09cc95cef62e48a2219a9a8e1b37 to your computer and use it in GitHub Desktop.
Save danthelion/6c3b09cc95cef62e48a2219a9a8e1b37 to your computer and use it in GitHub Desktop.
average age consumer
import json
import json
import os
from kafka import KafkaConsumer
BOOTSTRAP_SERVERS = (
"kafka:9092" if os.getenv("RUNTIME_ENVIRONMENT") == "DOCKER" else "localhost:9092"
)
TOPIC = "USERS"
def consume_messages():
consumer = KafkaConsumer(
TOPIC,
auto_offset_reset="earliest",
bootstrap_servers=BOOTSTRAP_SERVERS,
)
average_age = 0
user_count = 0
for msg in consumer:
idx, value = msg.key.decode("utf-8"), json.loads(msg.value)
average_age = (average_age * user_count + int(value["age"])) // (user_count + 1)
user_count += 1
print(
f"Received message {idx} -> {value} | Average age: {average_age} | User count: {user_count}"
)
if __name__ == "__main__":
consume_messages()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment