Skip to content

Instantly share code, notes, and snippets.

@nithyadurai87
Created November 23, 2020 17:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nithyadurai87/e2ac01eaeed67b1749b66c03ec167ec0 to your computer and use it in GitHub Desktop.
Save nithyadurai87/e2ac01eaeed67b1749b66c03ec167ec0 to your computer and use it in GitHub Desktop.
from pykafka import KafkaClient
from pymongo import MongoClient
import json
import sys
K_client = KafkaClient(hosts='localhost:9092')
topic = K_client.topics['dataets']
consumer = topic.get_simple_consumer(consumer_timeout_ms=5000)
M_client = MongoClient('localhost',27017)
db = M_client.records
collection = db.data
counter = 1
for i in consumer:
i = {str(counter):str(i.value.decode("utf-8")) }
collection.insert_one(i)
counter = counter + 1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment