Skip to content

Instantly share code, notes, and snippets.

@picadoh
Created September 17, 2016 16:28
Show Gist options
  • Save picadoh/7a60af47755b94de0e45cc8651d1098c to your computer and use it in GitHub Desktop.
Save picadoh/7a60af47755b94de0e45cc8651d1098c to your computer and use it in GitHub Desktop.
Spark Stateful Streaming with Python - Example that takes text from input network socket and prints the accumulated count for each word
# Spark Stateful Streaming with Python
# Takes text from input network socket and prints the accumulated count for each word
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
# define the update function
def updateTotalCount(currentCount, countState):
if countState is None:
countState = 0
return sum(currentCount, countState)
# create spark and streaming contexts
sc = SparkContext("local[*]", "StreamWordCounter")
ssc = StreamingContext(sc, 10)
# defining the checkpoint directory
ssc.checkpoint("/tmp")
# read text from input socket
text = ssc.socketTextStream("localhost", 9999)
# count words
countStream = text.flatMap(lambda line: line.split(" "))\
.map(lambda word: (word, 1))\
.reduceByKey(lambda a, b: a + b)
# update total count for each key
totalCounts = countStream.updateStateByKey(updateTotalCount)
# print the resulting tuples
totalCounts.pprint()
# start the streaming context
ssc.start()
ssc.awaitTermination()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment