Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save MallikarjunaG/078a0898d89dcb82eb2beb221ebc479e to your computer and use it in GitHub Desktop.
Save MallikarjunaG/078a0898d89dcb82eb2beb221ebc479e to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
import sys, os, re
import json
import datetime, iso8601
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, Row
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils, OffsetRange, TopicAndPartition
# Save to Mongo
from bson import json_util
import pymongo_spark
pymongo_spark.activate()
def main(base_path):
APP_NAME = "make_predictions_streaming.py"
# Process data every 10 seconds
PERIOD = 10
BROKERS = 'localhost:9092'
PREDICTION_TOPIC = 'flight_delay_classification_request'
try:
sc and ssc
except NameError as e:
import findspark
# Add the streaming package and initialize
findspark.add_packages(["org.apache.spark:spark-streaming-kafka-0-8_2.11:2.0.2"])
findspark.init()
import pyspark
import pyspark.sql
import pyspark.streaming
conf = SparkConf().set("spark.default.parallelism", 1)
sc = SparkContext(appName="Agile Data Science: PySpark Streaming 'Hello, World!'", conf=conf)
ssc = StreamingContext(sc, PERIOD)
spark = pyspark.sql.SparkSession(sc).builder.appName(APP_NAME).getOrCreate()
#
# Load all models to be used in making predictions
#
# Load the arrival delay bucketizer
from pyspark.ml.feature import Bucketizer
arrival_bucketizer_path = "{}/models/arrival_bucketizer.bin".format(base_path)
arrival_bucketizer = Bucketizer.load(arrival_bucketizer_path)
# Load the departure delay bucketizer
departure_bucketizer_path = "{}/models/departure_bucketizer.bin".format(base_path)
departure_bucketizer = Bucketizer.load(departure_bucketizer_path)
# Load all the string field vectorizer pipelines into a dict
from pyspark.ml import PipelineModel
string_vectorizer_pipeline_models = {}
for column in ["Carrier", "DayOfMonth", "DayOfWeek", "DayOfYear",
"Origin", "Dest", "FlightNum", "DepDelayBucket"]:
string_pipeline_model_path = "{}/models/string_indexer_pipeline_model_{}.bin".format(
base_path,
column
)
string_pipeline_model = PipelineModel.load(string_pipeline_model_path)
string_vectorizer_pipeline_models[column] = string_pipeline_model
# Load the numeric vector assembler
from pyspark.ml.feature import VectorAssembler
vector_assembler_path = "{}/models/numeric_vector_assembler.bin".format(base_path)
vector_assembler = VectorAssembler.load(vector_assembler_path)
# Load the final assembler
final_assembler_path = "{}/models/final_vector_assembler.bin".format(base_path)
final_assembler = VectorAssembler.load(final_assembler_path)
# Load the classifier model
from pyspark.ml.classification import RandomForestClassifier, RandomForestClassificationModel
random_forest_model_path = "{}/models/spark_random_forest_classifier.flight_delays.bin".format(
base_path
)
rfc = RandomForestClassificationModel.load(
random_forest_model_path
)
#
# Process Prediction Requests in Streaming
#
stream = KafkaUtils.createDirectStream(
ssc,
[PREDICTION_TOPIC],
{
"metadata.broker.list": BROKERS,
"group.id": "0",
}
)
object_stream = stream.map(lambda x: json.loads(x[1]))
object_stream.pprint()
row_stream = object_stream.map(
lambda x: Row(
FlightDate=iso8601.parse_date(x['FlightDate']),
Origin=x['Origin'],
Distance=x['Distance'],
DayOfMonth=x['DayOfMonth'],
DayOfYear=x['DayOfYear'],
UUID=x['UUID'],
DepDelay=x['DepDelay'],
DayOfWeek=x['DayOfWeek'],
FlightNum=x['FlightNum'],
Dest=x['Dest'],
Timestamp=iso8601.parse_date(x['Timestamp']),
Carrier=x['Carrier']
)
)
row_stream.pprint()
#
# Create a dataframe from the RDD-based object stream
#
def classify_prediction_requests(rdd):
from pyspark.sql.types import StringType, IntegerType, DoubleType, DateType, TimestampType
from pyspark.sql.types import StructType, StructField
prediction_request_schema = StructType([
StructField("Carrier", StringType(), True),
StructField("DayOfMonth", IntegerType(), True),
StructField("DayOfWeek", IntegerType(), True),
StructField("DayOfYear", IntegerType(), True),
StructField("DepDelay", DoubleType(), True),
StructField("Dest", StringType(), True),
StructField("Distance", DoubleType(), True),
StructField("FlightDate", DateType(), True),
StructField("FlightNum", StringType(), True),
StructField("Origin", StringType(), True),
StructField("Timestamp", TimestampType(), True),
StructField("UUID", StringType(), True),
])
prediction_requests_df = spark.createDataFrame(rdd, schema=prediction_request_schema)
prediction_requests_df.show()
# Bucketize the departure and arrival delays for classification
ml_bucketized_features = departure_bucketizer.transform(prediction_requests_df)
# Check the buckets
ml_bucketized_features.select("DepDelay", "DepDelayBucket").show()
# Vectorize string fields with the corresponding pipeline for that column
# Turn category fields into categoric feature vectors, then drop intermediate fields
for column in ["Carrier", "DayOfMonth", "DayOfWeek", "DayOfYear",
"Origin", "Dest", "FlightNum", "DepDelayBucket"]:
string_pipeline_path = "{}/models/string_indexer_pipeline_{}.bin".format(
base_path,
column
)
string_pipeline_model = string_vectorizer_pipeline_models[column]
ml_bucketized_features = string_pipeline_model.transform(ml_bucketized_features)
ml_bucketized_features = ml_bucketized_features.drop(column + "_index")
# Vectorize numeric columns
ml_bucketized_features = vector_assembler.transform(ml_bucketized_features)
# Drop the original numeric columns
numeric_columns = ["DepDelay", "Distance"]
# Combine various features into one feature vector, 'features'
final_vectorized_features = final_assembler.transform(ml_bucketized_features)
final_vectorized_features.show()
# Drop the individual vector columns
feature_columns = ["Carrier_vec", "DayOfMonth_vec", "DayOfWeek_vec", "DayOfYear_vec",
"Origin_vec", "Dest_vec", "FlightNum_vec", "DepDelayBucket_vec",
"NumericFeatures_vec"]
for column in feature_columns:
final_vectorized_features = final_vectorized_features.drop(column)
# Inspect the finalized features
final_vectorized_features.show()
# Make the prediction
predictions = rfc.transform(final_vectorized_features)
# Drop the features vector and prediction metadata to give the original fields
predictions = predictions.drop("Features_vec")
final_predictions = predictions.drop("indices").drop("values").drop("rawPrediction").drop("probability")
# Inspect the output
final_predictions.show()
# Store to Mongo
if final_predictions.count() > 0:
final_predictions.rdd.map(lambda x: x.asDict()).saveToMongoDB(
"mongodb://localhost:27017/agile_data_science.flight_delay_classification_response"
)
# Do the classification and store to Mongo
row_stream.foreachRDD(classify_prediction_requests)
ssc.start()
ssc.awaitTermination()
if __name__ == "__main__":
main(sys.argv[1])
-------------------------------------------
Time: 2016-12-15 23:38:00
-------------------------------------------
-------------------------------------------
Time: 2016-12-15 23:38:00
-------------------------------------------
+-------+----------+---------+---------+--------+----+--------+----------+---------+------+---------+----+
|Carrier|DayOfMonth|DayOfWeek|DayOfYear|DepDelay|Dest|Distance|FlightDate|FlightNum|Origin|Timestamp|UUID|
+-------+----------+---------+---------+--------+----+--------+----------+---------+------+---------+----+
+-------+----------+---------+---------+--------+----+--------+----------+---------+------+---------+----+
+--------+--------------+
|DepDelay|DepDelayBucket|
+--------+--------------+
+--------+--------------+
+-------+----------+---------+---------+--------+----+--------+----------+---------+------+---------+----+--------------+-----------+--------------+-------------+-------------+----------+--------+-------------+------------------+-------------------+------------+
|Carrier|DayOfMonth|DayOfWeek|DayOfYear|DepDelay|Dest|Distance|FlightDate|FlightNum|Origin|Timestamp|UUID|DepDelayBucket|Carrier_vec|DayOfMonth_vec|DayOfWeek_vec|DayOfYear_vec|Origin_vec|Dest_vec|FlightNum_vec|DepDelayBucket_vec|NumericFeatures_vec|Features_vec|
+-------+----------+---------+---------+--------+----+--------+----------+---------+------+---------+----+--------------+-----------+--------------+-------------+-------------+----------+--------+-------------+------------------+-------------------+------------+
+-------+----------+---------+---------+--------+----+--------+----------+---------+------+---------+----+--------------+-----------+--------------+-------------+-------------+----------+--------+-------------+------------------+-------------------+------------+
+-------+----------+---------+---------+--------+----+--------+----------+---------+------+---------+----+--------------+------------+
|Carrier|DayOfMonth|DayOfWeek|DayOfYear|DepDelay|Dest|Distance|FlightDate|FlightNum|Origin|Timestamp|UUID|DepDelayBucket|Features_vec|
+-------+----------+---------+---------+--------+----+--------+----------+---------+------+---------+----+--------------+------------+
+-------+----------+---------+---------+--------+----+--------+----------+---------+------+---------+----+--------------+------------+
+-------+----------+---------+---------+--------+----+--------+----------+---------+------+---------+----+--------------+----------+
|Carrier|DayOfMonth|DayOfWeek|DayOfYear|DepDelay|Dest|Distance|FlightDate|FlightNum|Origin|Timestamp|UUID|DepDelayBucket|Prediction|
+-------+----------+---------+---------+--------+----+--------+----------+---------+------+---------+----+--------------+----------+
+-------+----------+---------+---------+--------+----+--------+----------+---------+------+---------+----+--------------+----------+
-------------------------------------------
Time: 2016-12-15 23:38:10
-------------------------------------------
+-------+----------+---------+---------+--------+----+--------+----------+---------+------+---------+----+
|Carrier|DayOfMonth|DayOfWeek|DayOfYear|DepDelay|Dest|Distance|FlightDate|FlightNum|Origin|Timestamp|UUID|
+-------+----------+---------+---------+--------+----+--------+----------+---------+------+---------+----+
+-------+----------+---------+---------+--------+----+--------+----------+---------+------+---------+----+
16/12/15 23:38:13 ERROR python.PythonRDD: Error while sending iterator
java.net.SocketTimeoutException: Accept timed out
at java.net.PlainSocketImpl.socketAccept(Native Method)
at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
at java.net.ServerSocket.implAccept(ServerSocket.java:545)
at java.net.ServerSocket.accept(ServerSocket.java:513)
at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:697)
16/12/15 23:38:13 ERROR scheduler.JobScheduler: Error running job streaming job 1481873890000 ms.1
org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
File "/Users/rjurney/Software/Agile_Data_Code_2/spark/python/pyspark/streaming/util.py", line 65, in call
r = self.func(t, *rdds)
File "/Users/rjurney/Software/Agile_Data_Code_2/spark/python/pyspark/streaming/dstream.py", line 171, in takeAndPrint
taken = rdd.take(num + 1)
File "/Users/rjurney/Software/Agile_Data_Code_2/spark/python/pyspark/rdd.py", line 1310, in take
res = self.context.runJob(self, takeUpToNumLeft, p)
File "/Users/rjurney/Software/Agile_Data_Code_2/spark/python/pyspark/context.py", line 934, in runJob
return list(_load_from_socket(port, mappedRDD._jrdd_deserializer))
File "/Users/rjurney/Software/Agile_Data_Code_2/spark/python/pyspark/rdd.py", line 142, in _load_from_socket
for item in serializer.load_stream(rf):
File "/Users/rjurney/Software/Agile_Data_Code_2/spark/python/pyspark/serializers.py", line 139, in load_stream
yield self._read_with_length(stream)
File "/Users/rjurney/Software/Agile_Data_Code_2/spark/python/pyspark/serializers.py", line 156, in _read_with_length
length = read_int(stream)
File "/Users/rjurney/Software/Agile_Data_Code_2/spark/python/pyspark/serializers.py", line 543, in read_int
length = stream.read(4)
File "/Users/rjurney/anaconda3/lib/python3.5/socket.py", line 575, in readinto
return self._sock.recv_into(b)
socket.timeout: timed out
at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95)
at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:247)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:247)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:247)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:246)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
+--------+--------------+
|DepDelay|DepDelayBucket|
+--------+--------------+
+--------+--------------+
Traceback (most recent call last):
File "ch08/make_predictions_streaming.py", line 210, in <module>
main(sys.argv[1])
File "ch08/make_predictions_streaming.py", line 207, in main
ssc.awaitTermination()
File "/Users/rjurney/Software/Agile_Data_Code_2/spark/python/pyspark/streaming/context.py", line 206, in awaitTermination
self._jssc.awaitTermination()
File "/Users/rjurney/Software/Agile_Data_Code_2/spark/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py", line 1133, in __call__
File "/Users/rjurney/Software/Agile_Data_Code_2/spark/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/Users/rjurney/Software/Agile_Data_Code_2/spark/python/lib/py4j-0.10.3-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o31.awaitTermination.
: org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
File "/Users/rjurney/Software/Agile_Data_Code_2/spark/python/pyspark/streaming/util.py", line 65, in call
r = self.func(t, *rdds)
File "/Users/rjurney/Software/Agile_Data_Code_2/spark/python/pyspark/streaming/dstream.py", line 171, in takeAndPrint
taken = rdd.take(num + 1)
File "/Users/rjurney/Software/Agile_Data_Code_2/spark/python/pyspark/rdd.py", line 1310, in take
res = self.context.runJob(self, takeUpToNumLeft, p)
File "/Users/rjurney/Software/Agile_Data_Code_2/spark/python/pyspark/context.py", line 934, in runJob
return list(_load_from_socket(port, mappedRDD._jrdd_deserializer))
File "/Users/rjurney/Software/Agile_Data_Code_2/spark/python/pyspark/rdd.py", line 142, in _load_from_socket
for item in serializer.load_stream(rf):
File "/Users/rjurney/Software/Agile_Data_Code_2/spark/python/pyspark/serializers.py", line 139, in load_stream
yield self._read_with_length(stream)
File "/Users/rjurney/Software/Agile_Data_Code_2/spark/python/pyspark/serializers.py", line 156, in _read_with_length
length = read_int(stream)
File "/Users/rjurney/Software/Agile_Data_Code_2/spark/python/pyspark/serializers.py", line 543, in read_int
length = stream.read(4)
File "/Users/rjurney/anaconda3/lib/python3.5/socket.py", line 575, in readinto
return self._sock.recv_into(b)
socket.timeout: timed out
at org.apache.spark.streaming.api.python.TransformFunction.callPythonTransformFunction(PythonDStream.scala:95)
at org.apache.spark.streaming.api.python.TransformFunction.apply(PythonDStream.scala:78)
at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
at org.apache.spark.streaming.api.python.PythonDStream$$anonfun$callForeachRDD$1.apply(PythonDStream.scala:179)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
at scala.util.Try$.apply(Try.scala:192)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:247)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:247)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:247)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:246)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment