Skip to content

Instantly share code, notes, and snippets.

@jar349
Created January 9, 2018 16:23
Show Gist options
  • Save jar349/c1fa8d84bab92c8a6c4f427ffa27a4a0 to your computer and use it in GitHub Desktop.
Save jar349/c1fa8d84bab92c8a6c4f427ffa27a4a0 to your computer and use it in GitHub Desktop.
python-kafka test script
"""
kafka.py - tests the local kafka broker
"""
import datetime
import json
import os
from kafka import KafkaProducer, KafkaConsumer, TopicPartition
kafka_host = os.environ.get('KAFKA_HOST', 'localhost')
kafka_port = os.environ.get('KAFKA_PORT', '9092')
TOPIC = os.environ.get('KAFKA_TOPIC', 'test')
URL = '{}:{}'.format(kafka_host, kafka_port)
CLIENT_ID = os.environ.get('KAFKA_CLIENT_ID', 'tester')
class JsonSerializer(object):
@staticmethod
def serialize(value):
return json.dumps(value).encode('utf-8')
@staticmethod
def deserialize(value):
return json.loads(value.decode('utf-8'))
class Tester(object):
"""
Component that provides a produce and consume behavior
"""
def produce(self):
"""
Produces a message on kafka with the current datetime
"""
producer = KafkaProducer(
bootstrap_servers=[URL],
client_id=CLIENT_ID,
value_serializer=JsonSerializer.serialize,
api_version=(0, 10, 1),
batch_size=0,
acks='all'
)
message = datetime.datetime.now().isoformat()
future_metadata = producer.send(TOPIC, message)
producer.flush()
metadata = future_metadata.get(timeout=10)
print('Produced this message (offset={}): {}'.format(metadata.offset, message))
producer.close()
return metadata
def consume(self, metadata):
"""
Consumes a message
"""
consumer = KafkaConsumer(
bootstrap_servers=URL,
client_id=CLIENT_ID,
value_deserializer=JsonSerializer.deserialize,
api_version=(0, 10, 1)
)
partition = TopicPartition(TOPIC, 0)
consumer.assign([partition])
consumer.seek(partition, metadata.offset)
message = next(consumer)
print('Consumed this message (offset={}): {}'.format(metadata.offset, message))
consumer.close()
def main():
"""
main entrypoint
"""
tester = Tester()
message_info = tester.produce()
tester.consume(message_info)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment