Last active
November 23, 2015 22:59
-
-
Save rwalk/1f0598b7e46b13bf9ca0 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python3 | |
''' | |
Put documents from the stream into Kafka | |
''' | |
import argparse | |
from kafka import SimpleProducer, KafkaClient | |
from time import sleep | |
def chunk_iterable(A,n): | |
'''An iterable that contains the iterates of A divided into lists of size n. | |
A iterable | |
n int, size of chunk | |
''' | |
cnt = 0 | |
chunk = [] | |
for v in A: | |
if cnt<n: | |
cnt+=1 | |
chunk.append(v) | |
else: | |
yield(chunk) | |
cnt = 0 | |
chunk = [] | |
if len(chunk)>0: | |
yield(chunk) | |
if __name__=="__main__": | |
# arg parsing | |
parser = argparse.ArgumentParser(description="Feed Kafka a stream from a file") | |
parser.add_argument("file", help="A file of data, one datum per line") | |
parser.add_argument("host", help="Public IP address of a Kafka node") | |
parser.add_argument("topic", help="Kafka topic to feed") | |
parser.add_argument("-p", "--port", default=9092, help="port for zookeeper, default 9092") | |
parser.add_argument("-c", "--chunksize", default=100, help="Number of messages to send at one time, default 100") | |
parser.add_argument("-d", "--delay", default=0, help="Delay in ms between shipment of chunks to Kafka, default 0") | |
args = parser.parse_args() | |
# get a client | |
print("Connecting to Kafka node {0}:{1}".format(args.host, args.port)) | |
kafka = KafkaClient("{0}:{1}".format(args.host, args.port)) | |
producer = SimpleProducer(kafka) | |
# read through the file and send messages to Kafka in chunks | |
with open(args.file, "rb") as f: | |
for chunk in chunk_iterable(f, args.chunksize): | |
print("Sending {0} messages to topic {1} on {2}".format(len(chunk), args.topic, args.host)) | |
producer.send_messages(args.topic, *chunk) | |
sleep(1.0*int(args.delay)/1000.0) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment