Skip to content

Instantly share code, notes, and snippets.

@asdfsx
Created December 14, 2013 07:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save asdfsx/7956368 to your computer and use it in GitHub Desktop.
Save asdfsx/7956368 to your computer and use it in GitHub Desktop.
import logging
import threading
import multiprocessing
import time
from kafka.client import KafkaClient
from kafka.common import FetchRequest, ProduceRequest
from kafka.consumer import SimpleConsumer
from kafka.producer import SimpleProducer
def consume_example(processid,array):
client = KafkaClient("localhost", 9092)
consumer = SimpleConsumer(client,'test-consumer-group', "sztest2")
#while True:
for message in consumer:
print processid, message
array[processid] += 1
def monitor(array):
runningtime = 0
while True:
totalconsume = 0
for i in range(len(array)):
totalconsume += array[i]
print "running time: ", str(runningtime), "totalconsume: ", str(totalconsume)
time.sleep(1)
runningtime += 1
def main():
processNum = 2
array = multiprocessing.Array('i',processNum)
record = []
for i in range(processNum):
process = multiprocessing.Process(target = consume_example,args = (i,array))
process.start()
record.append(process)
#tmonitor = threading.Thread(target = monitor,args = (array,))
#tmonitor.setDaemon(True)
#tmonitor.start()
for process in record:
process.join()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment