Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save bfraiche/99f906ce9754059942938a704c78738d to your computer and use it in GitHub Desktop.
Save bfraiche/99f906ce9754059942938a704c78738d to your computer and use it in GitHub Desktop.
This gist contains the complete code for my blogpost: 'Random Forest with Python and Spark ML'
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
import matplotlib.pyplot as plt
import numpy as np
# Pull in the data
df = mc.sql("SELECT * FROM kings_county_housing")
# Define the workflow
feature_list = []
for col in df.columns:
if col == 'label':
continue
else:
feature_list.append(col)
assembler = VectorAssembler(inputCols=feature_list, outputCol="features")
rf = RandomForestRegressor(labelCol="label", featuresCol="features")
pipeline = Pipeline(stages=[assembler, rf])
# Hyperparameter tuning
paramGrid = ParamGridBuilder() \
.addGrid(rf.numTrees, [int(x) for x in np.linspace(start = 10, stop = 50, num = 3)]) \
.addGrid(rf.maxDepth, [int(x) for x in np.linspace(start = 5, stop = 25, num = 3)]) \
.build()
crossval = CrossValidator(estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=RegressionEvaluator(),
numFolds=3)
(trainingData, testData) = df.randomSplit([0.8, 0.2])
cvModel = crossval.fit(trainingData)
predictions = cvModel.transform(testData)
# Evaluate
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
rfPred = cvModel.transform(df)
rfResult = rfPred.toPandas()
plt.plot(rfResult.label, rfResult.prediction, 'bo')
plt.xlabel('Price')
plt.ylabel('Prediction')
plt.suptitle("Model Performance RMSE: %f" % rmse)
plt.show()
round(233941.72 / rmse, 1) # factor of improvement over baseline
# Feature importance
bestPipeline = cvModel.bestModel
bestModel = bestPipeline.stages[1]
importances = bestModel.featureImportances
x_values = list(range(len(importances)))
plt.bar(x_values, importances, orientation = 'vertical')
plt.xticks(x_values, feature_list, rotation=40)
plt.ylabel('Importance')
plt.xlabel('Feature')
plt.title('Feature Importances')
# Best hyperparameters
print('numTrees - ', bestModel.getNumTrees)
print('maxDepth - ', bestModel.getOrDefault('maxDepth'))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment