Skip to content

Instantly share code, notes, and snippets.

@bibiboot
Last active September 25, 2017 09:01
Show Gist options
  • Save bibiboot/0bdd00d971aa70c1c13cbb7edcd9b8e9 to your computer and use it in GitHub Desktop.
Save bibiboot/0bdd00d971aa70c1c13cbb7edcd9b8e9 to your computer and use it in GitHub Desktop.
PYSPARK Hello world script
# A SchemaRDD can be created for a JSON dataset represented by
# an RDD[String] storing one JSON object per string.
anotherPeopleRDD = sc.parallelize(['{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}',
'{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}'])
anotherPeople = sqlContext.jsonRDD(anotherPeopleRDD)
print anotherPeople.show()
print anotherPeople.printSchema()

Hello world scripts for pyspark

"""
{"created_at": 122, "id": 748, "text": "Hello Im posting a tweet!"}
{"created_at": 1222, "id": 748, "text": "Im posting a tweet!"}
""""
from pyspark.sql.types import *
fields = [StructField("text", StringType(), True), StructField("id", IntegerType(), True), StructField("created_at", IntegerType(), True) ]
jsonSchema = StructType(fields)
#df = sqlContext.read.load("/FileStore/tables/gorke4nr1467407735399/citylots.json", format="json", schema=jsonSchema)
df = sqlContext.read.json("/FileStore/tables/gorke4nr1467407735399/citylots.json", schema=jsonSchema)
print df.show()
#{"id": 748238620146839552, "text": "Hello Im posting a tweet!"}
#{"id": 748237961842483200, "text": "Im posting a tweet!"}
#JSON must have double quotes inside them for the below code to work fine.
#path = /tmp/data.json
#One JSON object per line
#df = sqlContext.jsonFile(path) (deprecated)
#df = sqlContext.read.load("FileStore/tables/36op5ogr1467404497005/citylots.json", format="json")
df = sqlContext.json.read(path)
print df.printSchema()
print df.show()
#{"id": 748238620146839552, "text": "Hello Im posting a tweet!"}
#{"id": 748237961842483200, "text": "Im posting a tweet!"}
#JSON must have double quotes inside them for the below code to work fine.
#path = /tmp/data.json
#One JSON object per line
rdd_strings = sc.textFile(path)
rdd_dicts = rds.map(json.loads)
print rdd_dicts.collect()
#https://docs.cloud.databricks.com/docs/latest/databricks_guide/07%20Spark%20Streaming/06%20FileStream%20Word%20Count%20-%20Python.html
from pyspark.streaming import StreamingContext
batchIntervalSeconds = 10
def creatingFunc():
ssc = StreamingContext(sc, batchIntervalSeconds)
# Set each DStreams in this context to remember RDDs it generated in the last given duration.
# DStreams remember RDDs only for a limited duration of time and releases them for garbage
# collection. This method allows the developer to specify how long to remember the RDDs (
# if the developer wishes to query old data outside the DStream computation).
ssc.remember(60)
lines = ssc.textFileStream("YOUR_S3_PATH_HERE")
lines.pprint()
words = lines.flatMap(lambda line: line.split(","))
pairs = words.map(lambda word: (word, 1))
wordCounts = pairs.reduceByKey(lambda x, y: x + y)
def process(time, rdd):
df = sqlContext.createDataFrame(rdd)
df.registerTempTable("myCounts")
wordCounts.foreachRDD(process)
return ssc
checkpointDir = "dbfs:/SOME_CHECKPOINT_DIRECTORY"
ssc = StreamingContext.getActiveOrCreate(checkpointDir, creatingFunc)
# This line starts the streaming context in the background.
ssc.start()
# This ensures the cell is put on hold until the background streaming thread is started properly.
ssc.awaitTerminationOrTimeout(batchIntervalSeconds * 2)
from pyspark.streaming import StreamingContext
dvc = [[-0.1, -0.1], [0.1, 0.1], [1.1, 1.1], [0.75, 0.75], [0.9, 0.9]]
dvc = [sc.parallelize(i, 1) for i in dvc]
ssc = StreamingContext(sc, 2.0)
input_stream = ssc.queueStream(dvc)
def get_output(rdd):
rdd_data = rdd.collect()
if 0.75 in rdd_data:
print "Ending marker found", rdd_data
ssc.stop()
else:
print "Not found ending marker. Continuing"
print rdd_data
input_stream.foreachRDD(get_output)
ssc.start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment