Skip to content

Instantly share code, notes, and snippets.

@boneill42
Created April 7, 2016 18:37
Show Gist options
  • Save boneill42/020dde814346c6b4ad0ba28406c3ea10 to your computer and use it in GitHub Desktop.
Save boneill42/020dde814346c6b4ad0ba28406c3ea10 to your computer and use it in GitHub Desktop.
Kinesis PySpark example
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kinesis import KinesisUtils, InitialPositionInStream
import sys
def foo(line):
print(line)
return line.split(" ")
def printRecord(rdd):
print("========================================================")
print("Starting new RDD")
print("========================================================")
rdd.foreach(lambda record: print(record.encode('utf8')))
if __name__ == "__main__":
reload(sys)
sys.setdefaultencoding('utf-8')
if len(sys.argv) != 5:
print( "Usage: dump.py <app-name> <stream-name> <endpoint-url> <region-name>", file=sys.stderr)
sys.exit(-1)
sc = SparkContext(appName="PythonStreamingKinesisWordCountAsl")
ssc = StreamingContext(sc, 10)
appName, streamName, endpointUrl, regionName = sys.argv[1:]
dstream = KinesisUtils.createStream(
ssc, appName, streamName, endpointUrl, regionName, InitialPositionInStream.TRIM_HORIZON, 10)
dstream.foreachRDD(printRecord)
ssc.start()
ssc.awaitTermination()
@devinbostIL
Copy link

You're importing sys twice.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment