Skip to content

Instantly share code, notes, and snippets.

@saran87
Last active December 11, 2015 00:32
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save saran87/244273071cb4518daafd to your computer and use it in GitHub Desktop.
Save saran87/244273071cb4518daafd to your computer and use it in GitHub Desktop.
import logging as log
import time
import multiprocessing
from pykafka import KafkaClient
from pykafka.common import OffsetType
log.basicConfig(level=log.DEBUG)
client = KafkaClient(hosts="127.0.0.1:9092")
def run_consumer():
topic = client.topics['test2']
log.info("Got the topic")
consumer = topic.get_simple_consumer("test")
log.info("initialized consumer")
consumer.start()
time.sleep(2)
while True:
try:
message = consumer.consume()
if message:
print message.value
except:
break
def run_consumers():
t = multiprocessing.Process(target=run_consumer, name="consumer")
t.daemon = False # True
t.start()
t.join()
if __name__ == "__main__":
run_consumers()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment