Skip to content

Instantly share code, notes, and snippets.

View GKarmakar's full-sized avatar

GKarmakar

View GitHub Profile
@GKarmakar
GKarmakar / code13
Last active December 19, 2018 04:09
#Save final model
%fs ls /FileStore/tables
gradientBoostedTreeModel.write().overwrite().save("/FileStore/tables/gbtModel")
from pyspark.ml.classification import GBTClassificationModel
savedGradientBoostingTreeModel = GBTClassificationModel.load("/FileStore/tables/gbtModel")
from pyspark.ml.classification import RandomForestClassifier
RandomForestEstimator = RandomForestClassifier(featuresCol="rawFeatures", labelCol="churn")
RandomForestModel = RandomForestClassifier.fit(preprocessedStageTraining3)
predictionsRandomForestTrainingDF = RandomForestModel .transform(predictionsTrainingDF)
predictionsRandomForestTestDF = RandomForestModel .transform(predictionsTestDF)
display_train_and_test_f1_score("Random Forest", predictionsRandomForestTrainingDF, predictionsRandomForestTestDF)
from pyspark.ml.classification import GBTClassifier
GBTClassifierEstimator = GBTClassifier(featuresCol="rawFeatures", labelCol="Churn")
GBTClassifierModel = GBTClassifierEstimator.fit(preprocessedStageTraining3)
predictionsGBTClassifierTrainingDF = GBTClassifierModel.transform(predictionsTrainingDF)
predictionsGBTClassifierTestDF = logisticRegressionModel.transform(predictionsTestDF)
display_train_and_test_f1_score("Gradient Boosted Tree", predictionsGBTClassifierTrainingDF, predictionsGBTClassifierTestDF)
from pyspark.ml.classification import LogisticRegression
logisticRegressionEstimator = LogisticRegression(featuresCol="rawFeatures", labelCol="Churn")
logisticRegressionModel = logisticRegressionEstimator.fit(preprocessedStageTraining3)
predictionslogisticRegressionTrainingDF = logisticRegressionModel.transform(predictionsTrainingDF)
predictionslogisticRegressionTestDF = logisticRegressionModel.transform(predictionsTestDF)
display_train_and_test_f1_score("Logistic Regression", predictionslogisticRegressionTrainingDF, predictionslogisticRegressionTestDF)
from pyspark.ml.classification import DecisionTreeClassifier
decisionTreeEstimator = DecisionTreeClassifier(featuresCol="rawFeatures", labelCol="Churn")
decisionTreeModel = decisionTreeEstimator.fit(preprocessedStageTraining3)
predictionsDecisionTreeTrainingDF = decisionTreeModel.transform(predictionsTrainingDF)
predictionsDecisionTreeTestDF = decisionTreeModel.transform(predictionsTestDF)
display_train_and_test_f1_score("Decision Tree", predictionsDecisionTreeTrainingDF, predictionsDecisionTreeTestDF)
from pyspark.sql.functions import lit
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
predictionsTrainingDF = preprocessedStageTraining3.withColumn("baseline_predicton", lit(0.0))
predictionsTestDF = preprocessedStageTest3.withColumn("baseline_predicton", lit(0.0))
def display_train_and_test_f1_score(name,
predictionsTrainingDF,
predictionsTestDF,
predictionsColumn="prediction",
from pyspark.ml.feature import VectorAssembler, VectorIndexer
featureCols = ["Account_length",
"Area_code",
"International_plan",
"Voice_mail_plan",
"Number_vmail_messages",
"Total_day_calls",
"Total_day_charge",
"Total_eve_calls",
"Total_eve_charge",
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoderEstimator
indexerAreaCode = StringIndexer(inputCol = "Area_code", outputCol="area_codeIndexed")
indexerState = StringIndexer(inputCol="State", outputCol="stateIndexed")
inputColumns = ["area_codeIndexed", "stateIndexed"]
outputColumns = ["area_codeEncoded", "stateEncoded"]
encoder = OneHotEncoderEstimator(inputCols=inputColumns, outputCols=outputColumns)
#Convert binary string columns (yes/no) to boolean true/false
from pyspark.sql.types import IntegerType
preprocessedStageTraining1 = (churnReducedTrainingDF
.withColumn("International_plan", col("International_plan") == "Yes")
.withColumn("Voice_mail_plan", col("Voice_mail_plan") == "Yes")
.withColumn("Churn", col("Churn").astype(IntegerType()))
)
display(preprocessedStageTraining1)
#Remove highly correlated features
churnReducedDF = churnDF.drop("total_day_minutes", "total_eve_minutes", "total_night_minutes", "total_intl_minutes")
churnReducedTrainingDF, churnReducedTestDF = churnReducedDF.randomSplit([0.7, 0.3], 24)
churnReducedTrainingDF.count()