Skip to content

Instantly share code, notes, and snippets.

@wfaria
Created August 23, 2018 17:36
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save wfaria/f0d15199223824e7a8753a0f32d8d516 to your computer and use it in GitHub Desktop.
Save wfaria/f0d15199223824e7a8753a0f32d8d516 to your computer and use it in GitHub Desktop.
Spark Streaming with Kafka example for a Medium article using Python
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
def main():
# Task configuration.
topic = "heartbeat"
brokerAddresses = "localhost:9092"
batchTime = 20
# Creating stream.
spark = SparkSession.builder.appName("PythonHeartbeatStreaming").getOrCreate()
sc = spark.sparkContext
ssc = StreamingContext(sc, batchTime)
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokerAddresses})
# The streaming can be handled as an usual RDD object.
# for example: kvs.map(lambda l : l.lower())
kvs.pprint() # Just printing result on stdout.
# Starting the task run.
ssc.start()
ssc.awaitTermination()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment