Skip to content

Instantly share code, notes, and snippets.

@gleicon
Last active April 22, 2021 20:28
Show Gist options
  • Save gleicon/189ce5cef3d8d1ea3dba509f395a578f to your computer and use it in GitHub Desktop.
Save gleicon/189ce5cef3d8d1ea3dba509f395a578f to your computer and use it in GitHub Desktop.
es -> kafka
import xmltodict
import json
import sys
import getopt
import time
from datetime import datetime
import collections
from elasticsearch import Elasticsearch
import logging
from kafka import KafkaProducer
# pip install kafka-python elasticsearch python-snappy
logger = logging.getLogger("kafka_reinject")
logger.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)
ES_SERVER = "internal-elasticsearch-prd-archive-2002760023"
ES_INDEX = "tracker_archive-2017.03.01"
KAFKA_BROKERS = ["event-hub-1","event-hub-2", "event-hub-3"]
KAFKA_TOPIC = "tracker.replaydata.v1"
ES = None
KAFKA = None
try:
ES = Elasticsearch(hosts=[ES_SERVER])
logger.info(ES)
except Exception, e:
logger.error("Exception ES: %s" % e)
sys.exit(-1)
try:
KAFKA_PRODUCER = KafkaProducer(bootstrap_servers=KAFKA_BROKERS, client_id="kafka_reinject", compression_type="snappy", retries=3, value_serializer=lambda m: json.dumps(m).encode('utf-8'))
logger.info(KAFKA_PRODUCER)
except Exception, e:
logger.error("Exception Kafka: %s", e)
sys.exit(-1)
def index_on_es(obj):
try:
pid = None
if obj.has_key("item:item"):
pid = obj["item:item"]["item:itemId"]
elif obj.has_key("item1:item"):
pid = obj["item1:item"]["item:itemId"]
elif obj.has_key("item:itemId"):
pid = obj["item:itemId"]
else:
pid = surrogate_counter
surrogate_counter+=1
logger.error("no item id, surrogate key: %d" % surrogate_counter)
r = es.index(index="produtos", doc_type="produto", id=pid, body=json.dumps(obj))
logger.info(r)
except Exception, e:
logger.error("Exception index on es: %s" % e)
def doc_adapter_vtnc(doc):
# from js
# const toArray = (v => typeof(v) === 'string' ? [v] : v);
# const massagers = {
# device_id: d=>d.device?d.device.id:undefined,
# referrer: d=>d.referer,
# creative_id: d=>toArray(d.creative_id),
# placement_id: d=>toArray(d.placement_id),
# site_id: d=>toArray(d.site_id)
#};
doc["device_id"] = doc.get("device_id", "undefined")
if doc.get("referer", None) is not None:
doc["referrer"] = doc["referer"]
doc["creative_id"] = [doc.get("creative_id", "")]
doc["placement_id"] = [doc.get("placement_id", "")]
doc["site_id"] = [doc.get("site_id", "")]
return doc
def main():
if ES is None or KAFKA_PRODUCER is None:
print "No ES or KAFKA ready"
sys.exit(-1)
res = ES.search(index=ES_INDEX, body={"query": {"match_all": {}}}, size=10, from_=0)
for hit in res["hits"]["hits"]:
hs = hit["_source"]
hs["_type"] = hit["_type"]
print "Injecting from %s - campaign_id: %s - visitor_id: %s - type: %s" % (hs["timestamp"], hs["campaign_id"], hs["visitor_id"], hs["_type"])
try:
doc = doc_adapter_vtnc(hs)
fs = KAFKA_PRODUCER.send(KAFKA_TOPIC, doc)
record = fs.get(timeout=60) # future
print record.offset
except Exception, ke:
logger.error("Kafka error: %s" % ke)
pass
# print json.dumps(hit["_source"])
KAFKA_PRODUCER.flush()
time.sleep(60) # test to push to kafka
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment