Skip to content

Instantly share code, notes, and snippets.

@WillisN
Created September 29, 2020 02:21
Show Gist options
  • Save WillisN/e49bbf954cb15021a5e8e82e9e4863cf to your computer and use it in GitHub Desktop.
Save WillisN/e49bbf954cb15021a5e8e82e9e4863cf to your computer and use it in GitHub Desktop.
spark_streaming
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
def updateCount(newCounts, state):
if state == None:
return sum(newCounts)
else:
return state + sum(newCounts)
# DataFrame operations inside your streaming program
def main():
sc = SparkContext(appName="Pyspark_Streaming_Demo")
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, 2) #Streaming will execute in every 2 seconds
lines = ssc.socketTextStream("localhost", 9009)
# create a new RDD with one word per line
counts = lines.flatMap(lambda line: line.split(" ")) \
.map(lambda x: (x, 1)) \
.reduceByKey(lambda a, b: a + b)
ssc.checkpoint("result/checkpoints")
totalWords = counts.updateStateByKey(lambda newCounts, state: updateCount(newCounts, state))
totalWords = totalWords.transform( lambda rdd: rdd.sortBy(lambda x: x[1], ascending=False))
totalWords.pprint(30)
ssc.start()
ssc.awaitTermination()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment