Skip to content

Instantly share code, notes, and snippets.

@amimimor
Last active August 29, 2015 14:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save amimimor/9981ea80a87fb225c664 to your computer and use it in GitHub Desktop.
Save amimimor/9981ea80a87fb225c664 to your computer and use it in GitHub Desktop.
#!/usr/bin/python
from kafka import SimpleProducer, KafkaClient, RoundRobinPartitioner, KeyedProducer
import os, time
broker = os.environ["broker"]
topic = os.environ["topic"]
parts = int(os.environ["parts"])
# To send messages synchronously
kafka = KafkaClient(str.format('{}:9092', broker))
producer = KeyedProducer(kafka, partitioner=RoundRobinPartitioner)
msg_time = str(int(time.time()))
# Note that the application is responsible for encoding messages to type bytes
for part in range(parts):
producer.send_messages(str.format(b'{}', topic), str.format(b'{}', part), str.format(b'{}', msg_time))
time.sleep(3)
producer.stop()
from kafka import KafkaConsumer
# To consume messages
consumer = KafkaConsumer(str.format(b'{}', topic) ,
bootstrap_servers=[str.format('{}:9092', broker)],
group_id='my_consumer_group',
auto_commit_enable=True,
auto_commit_interval_ms=30,
auto_offset_reset='smallest')
def process_message(message):
print("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
if message.value != msg_time:
return 255
results = []
for idx,m in enumerate(consumer.fetch_messages()):
results[idx] = process_message(m)
consumer.task_done(m)
consumer.commit()
if 255 in results:
exit(255)
else:
exit(0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment