Skip to content

Instantly share code, notes, and snippets.

@dgadiraju
Created March 11, 2018 09:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save dgadiraju/3c75ef0e3b077508b52e324c0d487ce8 to your computer and use it in GitHub Desktop.
Save dgadiraju/3c75ef0e3b077508b52e324c0d487ce8 to your computer and use it in GitHub Desktop.
from pyspark import SparkConf, SparkContext
from pyspark.streaming import StreamingContext
import sys
hostname = sys.argv[1]
port = int(sys.argv[2])
conf = SparkConf(). \
setAppName("Streaming Department Count"). \
setMaster("yarn-client")
sc = SparkContext(conf=conf)
ssc = StreamingContext(sc, 30)
messages = ssc.socketTextStream(hostname, port)
departmentMessages = messages. \
filter(lambda msg: msg.split(" ")[6].split("/")[1] == "department")
departmentNames = departmentMessages. \
map(lambda msg: (msg.split(" ")[6].split("/")[2], 1))
from operator import add
departmentCount = departmentNames. \
reduceByKey(add)
outputPrefix = sys.argv[3]
departmentCount.saveAsTextFiles(outputPrefix)
ssc.start()
ssc.awaitTermination()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment