Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sallos-cyber/a14e03da49cc0c873a651628dba4d096 to your computer and use it in GitHub Desktop.
Save sallos-cyber/a14e03da49cc0c873a651628dba4d096 to your computer and use it in GitHub Desktop.
Pyspark streaming from and to file
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
import time
spark = SparkSession.builder \
.appName('Streaming') \
.getOrCreate()
sc = spark._sc
#lines =spark.readStream \
# .format('socket') \
# .option('host', 'localhost') \
# .option('port', 9999) \
# .load()
lines =spark.readStream \
.format('text') \
.option('path', '/home/sallso/tmp/') \
.load()
words = lines.select(
explode(
split(lines.value, ' ')
).alias('word')
)
wordCounts = words.groupBy('word').count()
query = wordCounts \
.writeStream \
.outputMode("complete") \
.format("console") \
.start()
query.awaitTermination()
sc.stop()
~
~
~
~
~
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment