Skip to content

Instantly share code, notes, and snippets.

@ravi-tejarockon
Created February 28, 2019 15:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ravi-tejarockon/b68e69b868a1271e112982f603eac2a9 to your computer and use it in GitHub Desktop.
Save ravi-tejarockon/b68e69b868a1271e112982f603eac2a9 to your computer and use it in GitHub Desktop.
Spark & Kafka Integration - Spark Streaming (Spark 1.6.3)
# Running locations:
# Exeucing Command: bin\spark-submit --packages org.apache.spark:spark-streaming-kafka_2.10:1.6.3 <location_of_code>/kafkaPySpark.py localhost:2181 <topic_name>
# Location to Execute: <location_of_spark_bin>
from __future__ import print_function
import os
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka_2.10:1.6.3 pyspark-shell'
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: kafka_wordcount.py <zk> <topic>", file=sys.stderr)
exit(-1)
sc = SparkContext(appName="PythonStreamingKafkaWordCount")
ssc = StreamingContext(sc, 1)
zkQuorum, topic = sys.argv[1:]
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
lines = kvs.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b)
counts.pprint()
ssc.start()
ssc.awaitTermination()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment