Skip to content

Instantly share code, notes, and snippets.

@epsi95
Last active November 6, 2020 18:48
Show Gist options
  • Save epsi95/afb7aff85159f5133b665c0cd8b1b082 to your computer and use it in GitHub Desktop.
Save epsi95/afb7aff85159f5133b665c0cd8b1b082 to your computer and use it in GitHub Desktop.
from confluent_kafka import Consumer, KafkaError
import logging
import sys
import json
import datetime
logging.basicConfig(format='%(process)d >> %(asctime)s - %(message)s', level=logging.INFO)
settings = {
'bootstrap.servers': 'localhost:9092',
'group.id': 'unit-convert-group',
'client.id': "unit-converter",
'enable.auto.commit': True,
'session.timeout.ms': 6000,
'default.topic.config': {'auto.offset.reset': 'smallest'}
}
c = Consumer(settings)
c.subscribe(['car_data'])
try:
while True:
msg = c.poll(0.1)
if msg is None:
continue
elif not msg.error():
# logging.info('Received message: {0}'.format(msg.value()))
data = json.loads(msg.value().decode("utf-8"))
time = datetime.datetime.fromtimestamp(data["timestamp"])
time_string = f"{time.day}/{time.month}/{time.year} {time.time().hour}:{time.time().minute}:{time.time().second}"
speed_in_meters_per_seconds = (data["speed"] * 1000) / 3600;
with open("log.txt", "a") as f:
f.write(time_string + " " + data["id"] + " " + str(speed_in_meters_per_seconds) + "\n")
elif msg.error().code() == KafkaError._PARTITION_EOF:
logging.info('End of partition reached {0}/{1}'
.format(msg.topic(), msg.partition()))
else:
logging.info('Error occured: {0}'.format(msg.error().str()))
except KeyboardInterrupt:
pass
finally:
c.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment