Skip to content

Instantly share code, notes, and snippets.

@reedv
Last active July 8, 2021 10:26
Show Gist options
  • Save reedv/409df80f516ec17e330510365f75f558 to your computer and use it in GitHub Desktop.
Save reedv/409df80f516ec17e330510365f75f558 to your computer and use it in GitHub Desktop.
Basic example of basic data transformations for building (and training) basic Pipelines in spark.ml
import datetime
import json
import pprint
from pandas_profiling import ProfileReport
import os
import time
import pandas as pd
from IPython.display import display
import pyspark
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
# simpelest way to execute this script:
# /path/to/your/pyspark/virtualenv/bin/spark-submit --driver-memory 6g --executor-memory 12g <name of this script>
configs = json.load("/path/to/my/configs/dict.json")
raw_data = pd.read_csv("/path/to/my/data.TSV", sep="\t")
print('Checking for empty string (which can mess up spark onehotencoding). '
'You\'ll want to be sure none of the columns with empty strings are categorical columns')
for col in raw_data.columns:
print(f'Inspecting column {col}')
pp.pprint(raw_data[raw_data[col] == ''].index)
display(raw_data.iloc[raw_data[raw_data[col] == ''].index])
if len(df[df[col] == ''].index) > 0:
df.replace(r'^\s*$', np.nan, regex=True, inplace=True)
spark_session = SparkSession.builder.master('local').appName('myapp').getOrCreate()
spark_session.catalog.clearCache()
spark_session.sparkContext.setLogLevel("ERROR")
spark_session.conf.set('spark.sql.execution.pandas.convertToArrowArraySafely', 'true')
spark_session.conf.set('spark.sql.execution.arrow.pyspark.enabled', 'true')
print(spark_session.sparkContext.getConf().getAll())
print('Converting data to spark dataframe')
# shuffling training dataset as well
df = raw_data.sample(frac=1, random_state=64).reset_index(drop=True)
df_spark = spark_session.createDataFrame(df)
# account for class imbalances, see https://danvatterott.com/blog/2019/11/18/balancing-model-weights-in-pyspark/
y_collection = df_spark.select(configs['RESPONSE']).groupBy(configs['RESPONSE']).count().collect()
unique_y = [row[configs['RESPONSE']] for row in y_collection]
total_y = sum([row['count'] for row in y_collection])
unique_y_count = len(y_collection)
bin_count = [row['count'] for row in y_collection]
balancing_class_weights = {label: weight
for label, weight
in zip(unique_y, total_y / (unique_y_count * np.array(bin_count)))}
print(f"Balanced weights: {balancing_class_weights}")
df_spark = df_spark.withColumn('weight',
when(df_spark[configs['RESPONSE']] == configs['RESPONSE_LABELS']['POS'], balancing_class_weights[configs['RESPONSE_LABELS']['POS']])
.otherwise(balancing_class_weights[configs['RESPONSE_LABELS']['NEG']]))
print('Spark dataframe diagnostics:')
print('types: %s' % df_spark.dtypes)
print('row count: %d' % df_spark.count())
display(df_spark.show(n=3))
assert isinstance(configs['colnames_types'], dict)
num_features = [
label for label in configs['colnames_types'] if
(
configs['colnames_types'][label] != 'enum'
and label != configs['RESPONSE']
and label != configs['TRAIN_SET_PRIMARY_KEY']
and label != 'weight'
)
]
cat_features = [
label for label in configs['colnames_types'] if
(
configs['colnames_types'][label] == 'enum'
and label != configs['RESPONSE']
and label != configs['TRAIN_SET_PRIMARY_KEY']
and label != 'weight'
)
]
print("Training features:")
print("Numeric:")
pp.pprint(num_features)
print("Categorical:")
pp.pprint(cat_features)
label_idxer = StringIndexer(inputCol=configs['RESPONSE'],
outputCol="label").fit(df_spark)
# we fit so we can get the "labels" attribute to inform reconversion stage
feature_idxer = StringIndexer(inputCols=cat_features,
outputCols=[f"{f}_IDX" for f in cat_features],
handleInvalid="keep")
onehotencoder = OneHotEncoder(inputCols=feature_idxer.getOutputCols(),
outputCols=[f"{f}_OHE" for f in feature_idxer.getOutputCols()])
print("Assembling columns...")
pp.pprint(num_features + onehotencoder.getOutputCols())
assembler = VectorAssembler(inputCols=(num_features + onehotencoder.getOutputCols()),
outputCol="features")
rf = RandomForestClassifier(labelCol=label_idxer.getOutputCol(),
featuresCol=assembler.getOutputCol(),
weightCol="weight",
seed=123456789)
assert assembler.getOutputCol() == rf.getFeaturesCol()
assert label_idxer.getOutputCol() == rf.getLabelCol()
# https://stackoverflow.com/a/5969930/8236733
# pp.pprint(vars(rf))
pp.pprint(rf.extractParamMap())
pp.pprint(rf.getRawPredictionCol())
pp.pprint(rf.getProbabilityCol())
pp.pprint(rf.getPredictionCol())
label_converter = IndexToString(inputCol=rf.getPredictionCol(),
outputCol="prediction_label",
labels=label_idxer.labels)
pipeline = Pipeline(stages=[label_idxer, feature_idxer, onehotencoder,
assembler,
rf,
label_converter]) # type: pyspark.ml.pipeline.PipelineModel
# we would normally then do something like...
# pipline_transformer = pipeline.fit(spark_df)
# prediction_df = pipeline_transformer.transform(spark_df)
# ...(see https://spark.apache.org/docs/latest/ml-pipeline.html#pipeline-components)
# ...but instead, we are going to use cross validation to optimize the RDF w/in the pipeline
rfparamGrid = ParamGridBuilder() \
.addGrid(rf.maxDepth, configs['HYPERPARAMETER_RANGES']['max_depth']) \
.addGrid(rf.maxBins, configs['HYPERPARAMETER_RANGES']['nbins']) \
.addGrid(rf.numTrees, configs['HYPERPARAMETER_RANGES']['ntrees']) \
.addGrid(rf.minInstancesPerNode, configs['HYPERPARAMETER_RANGES']['min_rows']) \
.addGrid(rf.minInfoGain, configs['HYPERPARAMETER_RANGES']['min_split_improvement']) \
.addGrid(rf.subsamplingRate, configs['HYPERPARAMETER_RANGES']['sample_rate']) \
.build()
print("\n\n\nGridsearch grids size / combinations")
print(len(rfparamGrid)) # see https://stackoverflow.com/a/49284287/8236733
if len(rfparamGrid) > 1:
print(f'WARN: Note that gridsearch of {len(rfparamGrid)} combinations will take that many times longer to train a model.'
f'\nSo if a single run typically takes 45min, the gridsearch can increase that time to {45*len(rfparamGrid)}min')
#crossval = CrossValidator(estimator=pipeline,
# estimatorParamMaps=rfparamGrid,
# evaluator=MulticlassClassificationEvaluator(
# labelCol=rf.getLabelCol(),
# predictionCol=rf.getPredictionCol(),
# metricName="weightedFMeasure", beta=2), # since my data is imbalanced, I'm using F2 scoring metric
# numFolds=5)
crossval = CrossValidator(estimator=pipeline,
estimatorParamMaps=rfparamGrid,
evaluator=BinaryClassificationEvaluator(
labelCol="label",
rawPredictionCol=rf.getRawPredictionCol(),
metricName="areaUnderPR", # areaUnderPR, since there is outcome imbalance
weightCol="weight",),
numFolds=5) # type: pyspark.ml.tuning.CrossValidator
display(dff.head(n=3))
(train_u, test_u) = dff.randomSplit([0.8, 0.2])
print('training features:\n%s' % rf.getFeaturesCol())
print('training response field:\n%s' % rf.getLabelCol())
print('training set:')
print('types: %s' % train_u.dtypes)
display(train_u.head(n=3))
print('validation set:')
print('types: %s' % test_u.dtypes)
display(test_u.head(n=3))
assert train_u.dtypes == test_u.dtypes
#TODO: make 'weight' column more general and optional
assert 'weight' in train_u.columns
assert 'weight' in test_u.columns
max_train_time_hrs = 8
# fit on the cross validation estimstor to get the optimal pipline transformer/model
print(datetime.datetime.now())
best_rf_pipeline = crossval.fit(train_u) # type: pyspark.ml.pipeline.PipelineModel
print(datetime.datetime.now())
# now let's look at how it performs on the witheld test data as well as inspecting some aspects of the RDF model w/in the pipeline
test_prediction = best_rf_pipeline.transform(test_u)
model_id = str(configs['MODELNAME_PREFIX']) + \
'_v' + str(configs['VERSION']) + \
't' + str(int(time.time()) * 1000)
evals = MulticlassClassificationEvaluator(labelCol=rf.getLabelCol(), predictionCol=rf.getPredictionCol())
statistics = {
"acc": evals.evaluate(test_prediction, {evals.metricName: "accuracy"}),
"recall": evals.evaluate(test_prediction, {evals.metricName: "weightedRecall"}),
"precision": evals.evaluate(test_prediction, {evals.metricName: "weightedPrecision"}),
"f1": evals.evaluate(test_prediction, {evals.metricName: "f1"}),
"f2": evals.evaluate(test_prediction, {evals.metricName: "weightedFMeasure", evals.beta: 2}),
}
print("Model Information")
for stat in statistics:
print(stat + ": " + str(statistics[stat]))
print("Model Feature Importance:")
print(type(best_rf_pipeline))
print(type(best_rf_pipeline.bestModel))
for index, stage in enumerate(best_rf_pipeline.bestModel.stages):
print(f"{index}: {type(stage)}")
best_rf = best_rf_pipeline.bestModel.stages[3]
pp.pprint(type(best_rf.featureImportances))
pp.pprint(best_rf.featureImportances)
pp.pprint(best_rf.featureImportances.toArray())
pp.pprint(best_rf.featureImportances[1])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment