Skip to content

Instantly share code, notes, and snippets.

@dgadiraju
Created March 11, 2018 09:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save dgadiraju/1fa4ba1ec37faa468a8468bfa4ad6064 to your computer and use it in GitHub Desktop.
Save dgadiraju/1fa4ba1ec37faa468a8468bfa4ad6064 to your computer and use it in GitHub Desktop.
from pyspark import SparkConf, SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
import sys
conf = SparkConf(). \
setAppName("Streaming Department Count"). \
setMaster("yarn-client")
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 30)
topics = ["fkdemodg"]
brokerList = {"metadata.broker.list" : "nn01.itversity.com:6667,nn02.itversity.com:6667,rm01.itversity.com:6667"}
directKafkaStream = KafkaUtils.createDirectStream(ssc, topics, brokerList)
messages = directKafkaStream.map(lambda msg: msg[1])
departmentMessages = messages. \
filter(lambda msg: msg.split(" ")[6].split("/")[1] == "department")
departmentNames = departmentMessages. \
map(lambda msg: (msg.split(" ")[6].split("/")[2], 1))
from operator import add
departmentCount = departmentNames. \
reduceByKey(add)
outputPrefix = sys.argv[1]
departmentCount.saveAsTextFiles(outputPrefix)
ssc.start()
ssc.awaitTermination()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment