Spark ML and SQL classes are considered optional and therefore not loaded by default. You will need to load these before proceding with ML.
from pyspark.ml import Pipeline
from pyspark.ml.feature import *
from pyspark.ml.classification import *
from pyspark.ml.evaluation import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
The first step is to load the data into Spark as a Dataframe. You can load either the full set if the resources of your Spark cluster permit it, or load a selection of categories as shown below. The Dataframe dataset
is what we will use as our starting point.
dataset = spark.sql("SELECT * FROM dataset WHERE Category in ('ASSAULT','VANDALISM')")
display(dataset.take(10))
Get a feel for the data using different selections, filters and groupings.
display(dataset.select("PdDistrict").distinct())
display(dataset.filter(year("Dates") == 2003). \
groupBy("Category", "Address"). \
count(). \
orderBy("Category", "Address"))
First prepare columns that cannot be prepared by the Spark ML feature functions.
dataset = dataset.withColumn("Year", year("Dates"))
dataset = dataset.withColumn("Month", month("Dates"))
dataset = dataset.withColumn("Hour", hour("Dates"))
dataset = dataset.withColumn("Lat", round("X", 5))
dataset = dataset.withColumn("Lon", round("Y", 5))
display(dataset.take(10))
You will need to split it into a training set and a test set.
trainingset, testset = dataset.randomSplit([0.7, 0.3])
print("trainingset: %s, testset: %s" % (trainingset.count(), testset.count()))
category_indexer = StringIndexer(inputCol="Category", outputCol="category_index")
dayofweek_indexer = StringIndexer(inputCol="DayOfWeek", outputCol="dow_index")
district_indexer = StringIndexer(inputCol="PdDistrict", outputCol="dis_index")
resolution_indexer = StringIndexer(inputCol="Resolution", outputCol="res_index")
period_list = ["Year", "Month"]
period_assembler = VectorAssembler(inputCols=period_list, outputCol="period")
coord_list = ["Lat","Lon"]
coord_assembler = VectorAssembler(inputCols=coord_list, outputCol="coords")
feature_list = ["period","Hour", "coords", "dis_index","dow_index", "res_index"]
vector_assembler = VectorAssembler(inputCols=feature_list, outputCol="features")
random_forest = RandomForestClassifier(featuresCol="features", \
labelCol="category_index", \
maxBins=14000, \
maxDepth=10, \
impurity="entropy", \
featureSubsetStrategy="all")
stages = [category_indexer, \
dayofweek_indexer, \
district_indexer, \
resolution_indexer, \
period_assembler, \
coord_assembler, \
vector_assembler, \
random_forest]
pipeline = Pipeline(stages=stages)
model = pipeline.fit(trainingset)
prediction = model.transform(testset)
evaluator = MulticlassClassificationEvaluator(labelCol='category_index', \
predictionCol='prediction', \
metricName='accuracy')
accuracy = evaluator.evaluate(prediction)
print("Random Forest accuracy = {}".format(accuracy))