Skip to content

Instantly share code, notes, and snippets.

@jorisdevrede
Last active June 6, 2017 05:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jorisdevrede/e9b5dcf17fb80883586f74097aabff47 to your computer and use it in GitHub Desktop.
Save jorisdevrede/e9b5dcf17fb80883586f74097aabff47 to your computer and use it in GitHub Desktop.
Spark ML workshop

Import relevant Spark classes

Spark ML and SQL classes are considered optional and therefore not loaded by default. You will need to load these before proceding with ML.

from pyspark.ml import Pipeline
from pyspark.ml.feature import *
from pyspark.ml.classification import *
from pyspark.ml.evaluation import *

from pyspark.sql.types import *
from pyspark.sql.functions import *

Load data set

The first step is to load the data into Spark as a Dataframe. You can load either the full set if the resources of your Spark cluster permit it, or load a selection of categories as shown below. The Dataframe dataset is what we will use as our starting point.

dataset = spark.sql("SELECT * FROM dataset WHERE Category in ('ASSAULT','VANDALISM')")
display(dataset.take(10))

Explore the data set

Get a feel for the data using different selections, filters and groupings.

display(dataset.select("PdDistrict").distinct())

display(dataset.filter(year("Dates") == 2003). \
        groupBy("Category", "Address"). \
        count(). \
        orderBy("Category", "Address"))

Prepare columns

First prepare columns that cannot be prepared by the Spark ML feature functions.

dataset = dataset.withColumn("Year", year("Dates"))
dataset = dataset.withColumn("Month", month("Dates"))
dataset = dataset.withColumn("Hour", hour("Dates"))
dataset = dataset.withColumn("Lat", round("X", 5))
dataset = dataset.withColumn("Lon", round("Y", 5))
display(dataset.take(10))

Split data set

You will need to split it into a training set and a test set.

trainingset, testset = dataset.randomSplit([0.7, 0.3])
print("trainingset: %s, testset: %s" % (trainingset.count(), testset.count()))

Prepare Transformations and Algorithm

category_indexer = StringIndexer(inputCol="Category", outputCol="category_index")
dayofweek_indexer = StringIndexer(inputCol="DayOfWeek", outputCol="dow_index")
district_indexer = StringIndexer(inputCol="PdDistrict", outputCol="dis_index")
resolution_indexer = StringIndexer(inputCol="Resolution", outputCol="res_index")

period_list = ["Year", "Month"]
period_assembler = VectorAssembler(inputCols=period_list, outputCol="period")

coord_list = ["Lat","Lon"]
coord_assembler = VectorAssembler(inputCols=coord_list, outputCol="coords")

feature_list = ["period","Hour", "coords", "dis_index","dow_index", "res_index"]

vector_assembler = VectorAssembler(inputCols=feature_list, outputCol="features")

random_forest = RandomForestClassifier(featuresCol="features", \
                                       labelCol="category_index", \
                                       maxBins=14000, \
                                       maxDepth=10, \
                                       impurity="entropy", \
                                       featureSubsetStrategy="all")

Run the algorithm using the pipeline

stages = [category_indexer, \
          dayofweek_indexer, \
          district_indexer, \
          resolution_indexer, \
          period_assembler, \
          coord_assembler, \
          vector_assembler, \
          random_forest]

pipeline = Pipeline(stages=stages)

model = pipeline.fit(trainingset)
prediction = model.transform(testset)

Check the result

evaluator = MulticlassClassificationEvaluator(labelCol='category_index', \
                                              predictionCol='prediction', \
                                              metricName='accuracy')

accuracy = evaluator.evaluate(prediction)

print("Random Forest accuracy = {}".format(accuracy))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment