Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save pwasiewi/4349542b4d0c182bdb29c471ec0042b7 to your computer and use it in GitHub Desktop.
Save pwasiewi/4349542b4d0c182bdb29c471ec0042b7 to your computer and use it in GitHub Desktop.
This gist contains the complete code for my blogpost: 'Random Forest with Python and Spark ML'
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
import matplotlib.pyplot as plt
import numpy as np
# Pull in the data
df = mc.sql("SELECT * FROM kings_county_housing")
# Define the workflow
feature_list = []
for col in df.columns:
if col == 'label':
continue
else:
feature_list.append(col)
assembler = VectorAssembler(inputCols=feature_list, outputCol="features")
rf = RandomForestRegressor(labelCol="label", featuresCol="features")
pipeline = Pipeline(stages=[assembler, rf])
# Hyperparameter tuning
paramGrid = ParamGridBuilder() \
.addGrid(rf.numTrees, [int(x) for x in np.linspace(start = 10, stop = 50, num = 3)]) \
.addGrid(rf.maxDepth, [int(x) for x in np.linspace(start = 5, stop = 25, num = 3)]) \
.build()
crossval = CrossValidator(estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=RegressionEvaluator(),
numFolds=3)
(trainingData, testData) = df.randomSplit([0.8, 0.2])
cvModel = crossval.fit(trainingData)
predictions = cvModel.transform(testData)
# Evaluate
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
rfPred = cvModel.transform(df)
rfResult = rfPred.toPandas()
plt.plot(rfResult.label, rfResult.prediction, 'bo')
plt.xlabel('Price')
plt.ylabel('Prediction')
plt.suptitle("Model Performance RMSE: %f" % rmse)
plt.show()
round(233941.72 / rmse, 1) # factor of improvement over baseline
# Feature importance
bestPipeline = cvModel.bestModel
bestModel = bestPipeline.stages[1]
importances = bestModel.featureImportances
x_values = list(range(len(importances)))
plt.bar(x_values, importances, orientation = 'vertical')
plt.xticks(x_values, feature_list, rotation=40)
plt.ylabel('Importance')
plt.xlabel('Feature')
plt.title('Feature Importances')
# Best hyperparameters
print('numTrees - ', bestModel.getNumTrees)
print('maxDepth - ', bestModel.getOrDefault('maxDepth'))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment