Skip to content

Instantly share code, notes, and snippets.

@cipri7329
Last active September 16, 2016 14:03
Show Gist options
  • Save cipri7329/836c79521dfc80449c90e77d79766cc4 to your computer and use it in GitHub Desktop.
Save cipri7329/836c79521dfc80449c90e77d79766cc4 to your computer and use it in GitHub Desktop.
python kafka producer
import time
from kafka import KafkaProducer
import json
import base64
KAFKA_TOPIC = "scraped-data"
KAFKA_HOST = "localhost:9092"
producer = KafkaProducer(bootstrap_servers=KAFKA_HOST, value_serializer=lambda v: json.dumps(v).encode('utf-8'))
print "start producing to: " + KAFKA_TOPIC
localtime = time.localtime()
htime = time.strftime("%Y%m%d-%H%M%S", localtime)
print htime
for _ in range(3):
rowkey = "msg_"+str(_)
localtime = time.localtime()
htime = time.strftime("%Y%m%d-%H%M%S", localtime)
print htime
data = {}
data['rowkey'] = base64.b64encode(rowkey)
data['col1'] = base64.b64encode('col_value_' + str(_))
data['col2'] = base64.b64encode('col_value_' + str(_))
data['col3'] = base64.b64encode('col_value_' + str(_))
data['htime'] = base64.b64encode(htime)
try:
print data
producer.send(KAFKA_TOPIC, data)
except Exception as ex:
print ex
producer.flush()
print "kafka-topic producer finished"
@cipri7329
Copy link
Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment