Skip to content

Instantly share code, notes, and snippets.

@jrx
Created November 8, 2017 15:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jrx/56e72ada489bf36646525c34fdaa7d63 to your computer and use it in GitHub Desktop.
Save jrx/56e72ada489bf36646525c34fdaa7d63 to your computer and use it in GitHub Desktop.
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
sc = SparkContext(appName="StreamingExampleWithKafka")
ssc = StreamingContext(sc, 10)
opts = {"metadata.broker.list": "kafka-0-broker.kafka.autoip.dcos.thisdcos.directory:1025"}
kvs = KafkaUtils.createDirectStream(ssc, ["mytopic"], opts)
lines = kvs.map(lambda x: x[1])
counts = lines.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a+b)
counts.pprint()
ssc.start()
ssc.awaitTermination()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment