Created
April 8, 2022 08:40
-
-
Save sallos-cyber/a14e03da49cc0c873a651628dba4d096 to your computer and use it in GitHub Desktop.
Pyspark streaming from and to file
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark.sql.functions import explode | |
from pyspark.sql.functions import split | |
from pyspark.streaming import StreamingContext | |
from pyspark.sql import SparkSession | |
import time | |
spark = SparkSession.builder \ | |
.appName('Streaming') \ | |
.getOrCreate() | |
sc = spark._sc | |
#lines =spark.readStream \ | |
# .format('socket') \ | |
# .option('host', 'localhost') \ | |
# .option('port', 9999) \ | |
# .load() | |
lines =spark.readStream \ | |
.format('text') \ | |
.option('path', '/home/sallso/tmp/') \ | |
.load() | |
words = lines.select( | |
explode( | |
split(lines.value, ' ') | |
).alias('word') | |
) | |
wordCounts = words.groupBy('word').count() | |
query = wordCounts \ | |
.writeStream \ | |
.outputMode("complete") \ | |
.format("console") \ | |
.start() | |
query.awaitTermination() | |
sc.stop() | |
~ | |
~ | |
~ | |
~ | |
~ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment