Skip to content

Instantly share code, notes, and snippets.

Created February 28, 2019 15:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ravi-tejarockon/0d33e1e317180cd1c3e1068d85a7e5d5 to your computer and use it in GitHub Desktop.
Save ravi-tejarockon/0d33e1e317180cd1c3e1068d85a7e5d5 to your computer and use it in GitHub Desktop.
Spark & Kafka Integration - Spark Structured Streaming Streaming (Spark 2.x)
# Running locations:
# Exeucing Command: bin\spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11 <location_of_code>\ localhost:2181 <topic_name>
# Location to Execute: <location_of_spark_bin>
from __future__ import print_function
import sys
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1 pyspark-shell'
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
if __name__ == "__main__":
if len(sys.argv) != 4:
Usage: <bootstrap-servers> <subscribe-type> <topics>
""", file=sys.stderr)
bootstrapServers = sys.argv[1]
subscribeType = sys.argv[2]
topics = sys.argv[3]
spark = SparkSession\
# Create DataSet representing the stream of input lines from kafka
lines = spark\
.option("kafka.bootstrap.servers", bootstrapServers)\
.option(subscribeType, topics)\
.selectExpr("CAST(value AS STRING)")
# Split the lines into words
words =
# explode turns each item in an array into a separate row
split(lines.value, ' ')
# Generate running word count
wordCounts = words.groupBy('word').count()
# Start running the query that prints the running counts to the console
query = wordCounts\
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment