This gist contains the complete code for my blogpost: 'Random Forest with Python and Spark ML'
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark.ml import Pipeline | |
from pyspark.ml.feature import VectorAssembler | |
from pyspark.ml.regression import RandomForestRegressor | |
from pyspark.ml.evaluation import RegressionEvaluator | |
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator | |
import matplotlib.pyplot as plt | |
import numpy as np | |
# Pull in the data | |
df = mc.sql("SELECT * FROM kings_county_housing") | |
# Define the workflow | |
feature_list = [] | |
for col in df.columns: | |
if col == 'label': | |
continue | |
else: | |
feature_list.append(col) | |
assembler = VectorAssembler(inputCols=feature_list, outputCol="features") | |
rf = RandomForestRegressor(labelCol="label", featuresCol="features") | |
pipeline = Pipeline(stages=[assembler, rf]) | |
# Hyperparameter tuning | |
paramGrid = ParamGridBuilder() \ | |
.addGrid(rf.numTrees, [int(x) for x in np.linspace(start = 10, stop = 50, num = 3)]) \ | |
.addGrid(rf.maxDepth, [int(x) for x in np.linspace(start = 5, stop = 25, num = 3)]) \ | |
.build() | |
crossval = CrossValidator(estimator=pipeline, | |
estimatorParamMaps=paramGrid, | |
evaluator=RegressionEvaluator(), | |
numFolds=3) | |
(trainingData, testData) = df.randomSplit([0.8, 0.2]) | |
cvModel = crossval.fit(trainingData) | |
predictions = cvModel.transform(testData) | |
# Evaluate | |
evaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction", metricName="rmse") | |
rmse = evaluator.evaluate(predictions) | |
rfPred = cvModel.transform(df) | |
rfResult = rfPred.toPandas() | |
plt.plot(rfResult.label, rfResult.prediction, 'bo') | |
plt.xlabel('Price') | |
plt.ylabel('Prediction') | |
plt.suptitle("Model Performance RMSE: %f" % rmse) | |
plt.show() | |
round(233941.72 / rmse, 1) # factor of improvement over baseline | |
# Feature importance | |
bestPipeline = cvModel.bestModel | |
bestModel = bestPipeline.stages[1] | |
importances = bestModel.featureImportances | |
x_values = list(range(len(importances))) | |
plt.bar(x_values, importances, orientation = 'vertical') | |
plt.xticks(x_values, feature_list, rotation=40) | |
plt.ylabel('Importance') | |
plt.xlabel('Feature') | |
plt.title('Feature Importances') | |
# Best hyperparameters | |
print('numTrees - ', bestModel.getNumTrees) | |
print('maxDepth - ', bestModel.getOrDefault('maxDepth')) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment