Skip to content

Instantly share code, notes, and snippets.

@fonylew
Created June 29, 2017 09:10
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fonylew/15d7c791a9e5795deff4b57294785610 to your computer and use it in GitHub Desktop.
Save fonylew/15d7c791a9e5795deff4b57294785610 to your computer and use it in GitHub Desktop.
Simple Kafka direct consumer with Spark Streaming (Python) -- just consume it!
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
if __name__ == "__main__":
sc = SparkContext(appName="PythonStreamingDirectKafkaWordCount")
sc.setLogLevel("ERROR")
ssc = StreamingContext(sc, 2)
brokers, topic = sys.argv[1:]
kvs = KafkaUtils.createDirectStream(ssc, [topic],{"metadata.broker.list": brokers})
lines = kvs.map(lambda x: x[1].encode('utf-8'))
# counts = lines.flatMap(lambda line: line.split(" ")) \
# .map(lambda word: (word, 1)) \
# .reduceByKey(lambda a, b: a+b)
lines.pprint()
ssc.start()
ssc.awaitTermination()
# ssc.awaitTerminationOrTimeout(60)
ssc.stop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment