-
-
Save reedv/409df80f516ec17e330510365f75f558 to your computer and use it in GitHub Desktop.
Basic example of basic data transformations for building (and training) basic Pipelines in spark.ml
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime | |
import json | |
import pprint | |
from pandas_profiling import ProfileReport | |
import os | |
import time | |
import pandas as pd | |
from IPython.display import display | |
import pyspark | |
from pyspark.ml import Pipeline | |
from pyspark.ml.classification import RandomForestClassifier | |
from pyspark.ml.feature import IndexToString, StringIndexer, VectorAssembler | |
from pyspark.ml.evaluation import MulticlassClassificationEvaluator, BinaryClassificationEvaluator | |
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator | |
# simpelest way to execute this script: | |
# /path/to/your/pyspark/virtualenv/bin/spark-submit --driver-memory 6g --executor-memory 12g <name of this script> | |
configs = json.load("/path/to/my/configs/dict.json") | |
raw_data = pd.read_csv("/path/to/my/data.TSV", sep="\t") | |
print('Checking for empty string (which can mess up spark onehotencoding). ' | |
'You\'ll want to be sure none of the columns with empty strings are categorical columns') | |
for col in raw_data.columns: | |
print(f'Inspecting column {col}') | |
pp.pprint(raw_data[raw_data[col] == ''].index) | |
display(raw_data.iloc[raw_data[raw_data[col] == ''].index]) | |
if len(df[df[col] == ''].index) > 0: | |
df.replace(r'^\s*$', np.nan, regex=True, inplace=True) | |
spark_session = SparkSession.builder.master('local').appName('myapp').getOrCreate() | |
spark_session.catalog.clearCache() | |
spark_session.sparkContext.setLogLevel("ERROR") | |
spark_session.conf.set('spark.sql.execution.pandas.convertToArrowArraySafely', 'true') | |
spark_session.conf.set('spark.sql.execution.arrow.pyspark.enabled', 'true') | |
print(spark_session.sparkContext.getConf().getAll()) | |
print('Converting data to spark dataframe') | |
# shuffling training dataset as well | |
df = raw_data.sample(frac=1, random_state=64).reset_index(drop=True) | |
df_spark = spark_session.createDataFrame(df) | |
# account for class imbalances, see https://danvatterott.com/blog/2019/11/18/balancing-model-weights-in-pyspark/ | |
y_collection = df_spark.select(configs['RESPONSE']).groupBy(configs['RESPONSE']).count().collect() | |
unique_y = [row[configs['RESPONSE']] for row in y_collection] | |
total_y = sum([row['count'] for row in y_collection]) | |
unique_y_count = len(y_collection) | |
bin_count = [row['count'] for row in y_collection] | |
balancing_class_weights = {label: weight | |
for label, weight | |
in zip(unique_y, total_y / (unique_y_count * np.array(bin_count)))} | |
print(f"Balanced weights: {balancing_class_weights}") | |
df_spark = df_spark.withColumn('weight', | |
when(df_spark[configs['RESPONSE']] == configs['RESPONSE_LABELS']['POS'], balancing_class_weights[configs['RESPONSE_LABELS']['POS']]) | |
.otherwise(balancing_class_weights[configs['RESPONSE_LABELS']['NEG']])) | |
print('Spark dataframe diagnostics:') | |
print('types: %s' % df_spark.dtypes) | |
print('row count: %d' % df_spark.count()) | |
display(df_spark.show(n=3)) | |
assert isinstance(configs['colnames_types'], dict) | |
num_features = [ | |
label for label in configs['colnames_types'] if | |
( | |
configs['colnames_types'][label] != 'enum' | |
and label != configs['RESPONSE'] | |
and label != configs['TRAIN_SET_PRIMARY_KEY'] | |
and label != 'weight' | |
) | |
] | |
cat_features = [ | |
label for label in configs['colnames_types'] if | |
( | |
configs['colnames_types'][label] == 'enum' | |
and label != configs['RESPONSE'] | |
and label != configs['TRAIN_SET_PRIMARY_KEY'] | |
and label != 'weight' | |
) | |
] | |
print("Training features:") | |
print("Numeric:") | |
pp.pprint(num_features) | |
print("Categorical:") | |
pp.pprint(cat_features) | |
label_idxer = StringIndexer(inputCol=configs['RESPONSE'], | |
outputCol="label").fit(df_spark) | |
# we fit so we can get the "labels" attribute to inform reconversion stage | |
feature_idxer = StringIndexer(inputCols=cat_features, | |
outputCols=[f"{f}_IDX" for f in cat_features], | |
handleInvalid="keep") | |
onehotencoder = OneHotEncoder(inputCols=feature_idxer.getOutputCols(), | |
outputCols=[f"{f}_OHE" for f in feature_idxer.getOutputCols()]) | |
print("Assembling columns...") | |
pp.pprint(num_features + onehotencoder.getOutputCols()) | |
assembler = VectorAssembler(inputCols=(num_features + onehotencoder.getOutputCols()), | |
outputCol="features") | |
rf = RandomForestClassifier(labelCol=label_idxer.getOutputCol(), | |
featuresCol=assembler.getOutputCol(), | |
weightCol="weight", | |
seed=123456789) | |
assert assembler.getOutputCol() == rf.getFeaturesCol() | |
assert label_idxer.getOutputCol() == rf.getLabelCol() | |
# https://stackoverflow.com/a/5969930/8236733 | |
# pp.pprint(vars(rf)) | |
pp.pprint(rf.extractParamMap()) | |
pp.pprint(rf.getRawPredictionCol()) | |
pp.pprint(rf.getProbabilityCol()) | |
pp.pprint(rf.getPredictionCol()) | |
label_converter = IndexToString(inputCol=rf.getPredictionCol(), | |
outputCol="prediction_label", | |
labels=label_idxer.labels) | |
pipeline = Pipeline(stages=[label_idxer, feature_idxer, onehotencoder, | |
assembler, | |
rf, | |
label_converter]) # type: pyspark.ml.pipeline.PipelineModel | |
# we would normally then do something like... | |
# pipline_transformer = pipeline.fit(spark_df) | |
# prediction_df = pipeline_transformer.transform(spark_df) | |
# ...(see https://spark.apache.org/docs/latest/ml-pipeline.html#pipeline-components) | |
# ...but instead, we are going to use cross validation to optimize the RDF w/in the pipeline | |
rfparamGrid = ParamGridBuilder() \ | |
.addGrid(rf.maxDepth, configs['HYPERPARAMETER_RANGES']['max_depth']) \ | |
.addGrid(rf.maxBins, configs['HYPERPARAMETER_RANGES']['nbins']) \ | |
.addGrid(rf.numTrees, configs['HYPERPARAMETER_RANGES']['ntrees']) \ | |
.addGrid(rf.minInstancesPerNode, configs['HYPERPARAMETER_RANGES']['min_rows']) \ | |
.addGrid(rf.minInfoGain, configs['HYPERPARAMETER_RANGES']['min_split_improvement']) \ | |
.addGrid(rf.subsamplingRate, configs['HYPERPARAMETER_RANGES']['sample_rate']) \ | |
.build() | |
print("\n\n\nGridsearch grids size / combinations") | |
print(len(rfparamGrid)) # see https://stackoverflow.com/a/49284287/8236733 | |
if len(rfparamGrid) > 1: | |
print(f'WARN: Note that gridsearch of {len(rfparamGrid)} combinations will take that many times longer to train a model.' | |
f'\nSo if a single run typically takes 45min, the gridsearch can increase that time to {45*len(rfparamGrid)}min') | |
#crossval = CrossValidator(estimator=pipeline, | |
# estimatorParamMaps=rfparamGrid, | |
# evaluator=MulticlassClassificationEvaluator( | |
# labelCol=rf.getLabelCol(), | |
# predictionCol=rf.getPredictionCol(), | |
# metricName="weightedFMeasure", beta=2), # since my data is imbalanced, I'm using F2 scoring metric | |
# numFolds=5) | |
crossval = CrossValidator(estimator=pipeline, | |
estimatorParamMaps=rfparamGrid, | |
evaluator=BinaryClassificationEvaluator( | |
labelCol="label", | |
rawPredictionCol=rf.getRawPredictionCol(), | |
metricName="areaUnderPR", # areaUnderPR, since there is outcome imbalance | |
weightCol="weight",), | |
numFolds=5) # type: pyspark.ml.tuning.CrossValidator | |
display(dff.head(n=3)) | |
(train_u, test_u) = dff.randomSplit([0.8, 0.2]) | |
print('training features:\n%s' % rf.getFeaturesCol()) | |
print('training response field:\n%s' % rf.getLabelCol()) | |
print('training set:') | |
print('types: %s' % train_u.dtypes) | |
display(train_u.head(n=3)) | |
print('validation set:') | |
print('types: %s' % test_u.dtypes) | |
display(test_u.head(n=3)) | |
assert train_u.dtypes == test_u.dtypes | |
#TODO: make 'weight' column more general and optional | |
assert 'weight' in train_u.columns | |
assert 'weight' in test_u.columns | |
max_train_time_hrs = 8 | |
# fit on the cross validation estimstor to get the optimal pipline transformer/model | |
print(datetime.datetime.now()) | |
best_rf_pipeline = crossval.fit(train_u) # type: pyspark.ml.pipeline.PipelineModel | |
print(datetime.datetime.now()) | |
# now let's look at how it performs on the witheld test data as well as inspecting some aspects of the RDF model w/in the pipeline | |
test_prediction = best_rf_pipeline.transform(test_u) | |
model_id = str(configs['MODELNAME_PREFIX']) + \ | |
'_v' + str(configs['VERSION']) + \ | |
't' + str(int(time.time()) * 1000) | |
evals = MulticlassClassificationEvaluator(labelCol=rf.getLabelCol(), predictionCol=rf.getPredictionCol()) | |
statistics = { | |
"acc": evals.evaluate(test_prediction, {evals.metricName: "accuracy"}), | |
"recall": evals.evaluate(test_prediction, {evals.metricName: "weightedRecall"}), | |
"precision": evals.evaluate(test_prediction, {evals.metricName: "weightedPrecision"}), | |
"f1": evals.evaluate(test_prediction, {evals.metricName: "f1"}), | |
"f2": evals.evaluate(test_prediction, {evals.metricName: "weightedFMeasure", evals.beta: 2}), | |
} | |
print("Model Information") | |
for stat in statistics: | |
print(stat + ": " + str(statistics[stat])) | |
print("Model Feature Importance:") | |
print(type(best_rf_pipeline)) | |
print(type(best_rf_pipeline.bestModel)) | |
for index, stage in enumerate(best_rf_pipeline.bestModel.stages): | |
print(f"{index}: {type(stage)}") | |
best_rf = best_rf_pipeline.bestModel.stages[3] | |
pp.pprint(type(best_rf.featureImportances)) | |
pp.pprint(best_rf.featureImportances) | |
pp.pprint(best_rf.featureImportances.toArray()) | |
pp.pprint(best_rf.featureImportances[1]) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment