Skip to content

Instantly share code, notes, and snippets.

@sharmanirek
Created July 23, 2020 21:19
Show Gist options
  • Save sharmanirek/a3085bd617b54bde3d79faed5dd81de7 to your computer and use it in GitHub Desktop.
Save sharmanirek/a3085bd617b54bde3d79faed5dd81de7 to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{"cells":[{"metadata":{},"cell_type":"markdown","source":"# Machine Learning Model Creation in Splice Machine\n#### Starting the Spark Session"},{"metadata":{"trusted":true},"cell_type":"code","source":"# Setup\nfrom pyspark.sql import SparkSession\nfrom splicemachine.spark import PySpliceContext\n\nspark = SparkSession.builder.getOrCreate()\nsplice = PySpliceContext(spark)","execution_count":2,"outputs":[]},{"metadata":{},"cell_type":"markdown","source":"## Importing MLflow Support\n<blockquote><p class='quotation'><span style='font-size:15px'> As explained in <a href='./7.2 Splice MLflow Support.ipynb'>7.2 Splice MLflow Support</a>, using MLflow on Splice Machine is extremely easy. Referencing our <a href='https://pysplice.readthedocs.io/en/dbaas-4100/splicemachine.mlflow_support.html'>documentation</a> for the available functionality.<br><footer>Splice Machine</footer>\n</blockquote>"},{"metadata":{"trusted":true},"cell_type":"code","source":"# MLFlow Setup\nfrom splicemachine.mlflow_support import *\nmlflow.register_splice_context(splice)","execution_count":4,"outputs":[]},{"metadata":{},"cell_type":"markdown","source":"## Starting an experiment\n<blockquote><p class='quotation'><span style='font-size:15px'> Here we'll begin an experiment to keep track of our modeling efforts for this prediction task.<footer>Splice Machine</footer>\n</blockquote>"},{"metadata":{"trusted":true},"cell_type":"code","source":"mlflow.set_experiment('model_creation_demo')","execution_count":8,"outputs":[{"output_type":"stream","text":"INFO: 'model_creation_demo' does not exist. Creating a new experiment\n","name":"stdout"}]},{"metadata":{},"cell_type":"markdown","source":"## Starting a run\n<blockquote><p class='quotation'><span style='font-size:15px'> Here we'll begin an experiment to keep track of our modeling efforts in this notebook specifically.<footer>Splice Machine</footer>\n</blockquote>"},{"metadata":{"trusted":true},"cell_type":"code","source":"#start our first MLFlow run\nfrom datetime import datetime\n\ntags = {'team': 'Splice Machine', 'purpose': 'fraud DEMO'}\nmlflow.start_run(tags=tags, run_name=f\"RF_run\")","execution_count":10,"outputs":[{"output_type":"execute_result","execution_count":10,"data":{"text/plain":"<ActiveRun: >"},"metadata":{}}]},{"metadata":{},"cell_type":"markdown","source":"## Ingesting Data\n<blockquote><p class='quotation'><span style='font-size:15px'> Ingesting the table created in <a href='./7.3 Data Exploration.ipynb'>7.3 Data Exploration</a>, we will begin constructing a very simple Machine Learning Model. <footer>Splice Machine</footer>\n</blockquote>"},{"metadata":{"trusted":true},"cell_type":"code","source":"sql_query = \"SELECT * FROM cc_fraud_data\"\ndf = splice.df(sql_query)","execution_count":12,"outputs":[]},{"metadata":{},"cell_type":"markdown","source":"## Logging our first Parameter \n<blockquote><p class='quotation'><span style='font-size:15px'> We're utilizing MLFlow to keep track of the query we used to ingest the data for this modeling effort. <footer>Splice Machine</footer>\n</blockquote>"},{"metadata":{"trusted":true},"cell_type":"code","source":"# Logging our first parameter: the query we used to ingest our data\nmlflow.log_param(\"ingest_query\", sql_query)","execution_count":13,"outputs":[]},{"metadata":{},"cell_type":"markdown","source":"## Selecting Our Features\n<blockquote>Here we'll select the features only most strongly correlated to our target<footer>Splice Machine</footer>\n</blockquote>"},{"metadata":{"trusted":true,"scrolled":true},"cell_type":"code","source":"import pandas as pd\npdf = df.filter(df.CLASS_RESULT == 0).limit(900).toPandas()\\\n .append(df.filter(df.CLASS_RESULT == 1).limit(100).toPandas())\npdf = pdf.apply(pd.to_numeric)\ncorr = pdf.corr()\n\nmost_correlated = corr.abs()['CLASS_RESULT'].sort_values(ascending=False).reset_index()\nmost_correlated = most_correlated.iloc[1:].rename({\"index\":\"feature\",\"CLASS_RESULT\":\"correlation_to_target\"}, axis = 1)\nprint(most_correlated)","execution_count":9,"outputs":[{"output_type":"stream","text":" feature correlation_to_target\n1 TIME_OFFSET 0.861746\n2 ROLLING_AVG_DAILY_TRANS_AMNT 0.834575\n3 MACD_TRANS_CNT 0.826945\n4 ROLLING_AVG_WEEKLY_TRANS_AMNT 0.752052\n5 ROLLING_AVG_DAILY_TRANS_CNT 0.739858\n6 EXPECTED_DAILY_TRANS_CNT 0.708627\n7 AROON_TRANS_AMNT 0.705022\n8 EXPECTED_DAILY_TRANS_AMNT 0.696459\n9 RSI_TRANS_CNT 0.683260\n10 DAILY_TRANS_CNT 0.663161\n11 EXPECTED_WEEKLY_TRANS_AMNT 0.626029\n12 ROLLING_AVG_WEEKLY_TRANS_CNT 0.567841\n13 AROON_TRANS_CNT 0.545996\n14 EXPECTED_WEEKLY_TRANS_CNT 0.538239\n15 WEEKLY_TRANS_CNT 0.513323\n16 WEEKLY_TRANS_AMNT 0.475171\n17 DAILY_TRANS_AMNT 0.445158\n18 CREDIT_SCORE 0.435387\n19 CURRENT_BALANCE 0.251664\n20 ADX_TRANS_CNT 0.191480\n21 CREDIT_LIMIT 0.145593\n22 ADX_TRANS_AMNT 0.124552\n23 MACD_BALANCE 0.097921\n24 RSI_BALANCE 0.088247\n25 AROON_BALANCE 0.086883\n26 ROLLING_AVG_BALANCE 0.072767\n27 MACD_TRANS_AMNT 0.071346\n28 ADX_BALANCE 0.056237\n29 RSI_TRANS_AMNT 0.052941\n30 AMOUNT 0.042056\n","name":"stdout"}]},{"metadata":{"trusted":true},"cell_type":"code","source":"CORRELATION_CUTOFF = 0.05\n#Logging this in mlflow\nmlflow.log_param(\"correlation_cutoff\", CORRELATION_CUTOFF)\n\nfeature_cols = list(most_correlated[most_correlated['correlation_to_target']>CORRELATION_CUTOFF]['feature'])\nprint(feature_cols)","execution_count":17,"outputs":[{"output_type":"stream","text":"['TIME_OFFSET', 'ROLLING_AVG_DAILY_TRANS_AMNT', 'MACD_TRANS_CNT', 'ROLLING_AVG_WEEKLY_TRANS_AMNT', 'ROLLING_AVG_DAILY_TRANS_CNT', 'EXPECTED_DAILY_TRANS_CNT', 'AROON_TRANS_AMNT', 'EXPECTED_DAILY_TRANS_AMNT', 'RSI_TRANS_CNT', 'DAILY_TRANS_CNT', 'EXPECTED_WEEKLY_TRANS_AMNT', 'ROLLING_AVG_WEEKLY_TRANS_CNT', 'AROON_TRANS_CNT', 'EXPECTED_WEEKLY_TRANS_CNT', 'WEEKLY_TRANS_CNT', 'WEEKLY_TRANS_AMNT', 'DAILY_TRANS_AMNT', 'CREDIT_SCORE', 'CURRENT_BALANCE', 'ADX_TRANS_CNT', 'CREDIT_LIMIT', 'ADX_TRANS_AMNT', 'MACD_BALANCE', 'RSI_BALANCE', 'AROON_BALANCE', 'ROLLING_AVG_BALANCE', 'MACD_TRANS_AMNT', 'ADX_BALANCE', 'RSI_TRANS_AMNT']\n","name":"stdout"}]},{"metadata":{},"cell_type":"markdown","source":"## Defining a Machine Learning Pipeline\n\n<blockquote>We'll use Spark's <code>Pipeline</code> class to define a set of <code>Transformers</code> that get your dataset ready for modeling<br>\nWe'll then use <code>mlflow</code> to <code>log</code> our Pipeline stages. Both <code>log_pipeline_stages</code> and <code>log_feature_transformations</code> are custom Splice Machine functions for tracking Spark Pipelines. </blockquote>"},{"metadata":{"trusted":true},"cell_type":"code","source":"%%time\nfrom pyspark.ml.feature import StandardScaler, VectorAssembler\nfrom pyspark.ml import Pipeline,PipelineModel\nfrom pyspark.ml.classification import RandomForestClassifier, MultilayerPerceptronClassifier\n\n\"\"\"\nThe preprocessing stages for this example are: \n1) Vector assembling the feature columns \n2) Standardizing our feature columns\n\"\"\"\nmax_depth = 5 \nnum_trees = 20\n\nassembler = VectorAssembler(inputCols=feature_cols, outputCol='features')\nscaler = StandardScaler(inputCol=\"features\", outputCol='scaledFeatures')\nrf = RandomForestClassifier(featuresCol = 'scaledFeatures', labelCol = 'CLASS_RESULT', maxDepth = max_depth, numTrees = num_trees)\n\n# Pipeline to preprocess and model our data\nmlpipe = Pipeline(stages=[assembler,scaler, rf])\n\n# Custom Splice functions to add granularity and governance to your Spark Pipeline Models\nmlflow.log_pipeline_stages(mlpipe)\nmlflow.log_feature_transformations(mlpipe)","execution_count":21,"outputs":[{"output_type":"stream","text":"Warning: Transformer RandomForestClassifier_6be804c5bc8a could not be parsed. If this is a model, this is expected.\nWarning: Transformer RandomForestClassifier_6be804c5bc8a could not be parsed. If this is a model, this is expected.\nCPU times: user 91.8 ms, sys: 4.56 ms, total: 96.4 ms\nWall time: 1.14 s\n","name":"stdout"}]},{"metadata":{},"cell_type":"markdown","source":"## Separating our data for performance evaluation \n<blockquote> We are using a simple, single train/ test split to assess the performance of our simple model. Of note, we are not invesitgated the class balances, and we are using untuned hyperparameters to predict the target variable. These can be adjusted as an exercise. <footer>Splice Machine</footer>\n</blockquote>"},{"metadata":{"trusted":true},"cell_type":"code","source":"from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator\nfrom pyspark.ml.tuning import CrossValidator, ParamGridBuilder, CrossValidatorModel\nfrom splicemachine.stats import *\n\n#splitting our data into a training and testing set\n(train, test) = df.randomSplit([0.8, 0.2])\n\nmlflow.lp(\"train_ratio\", 0.80)","execution_count":19,"outputs":[]},{"metadata":{},"cell_type":"markdown","source":"## Fitting our model \n<blockquote> Training our model and logging executing time using Splcie's custom <code>with mlflow.timer('timer_name')</code> block function to track the time it takes to complete a block. Everything in the block will be timed, and then logged to mlflow under the timer name provided to the function. <footer>Splice Machine</footer>\n</blockquote>"},{"metadata":{"trusted":true,"scrolled":true},"cell_type":"code","source":"with mlflow.timer('training'):\n fitted_model = mlpipe.fit(train)\n# Log the parameters of the best model\nmlflow.log_model_params(fitted_model)\n","execution_count":23,"outputs":[]},{"metadata":{"trusted":false},"cell_type":"markdown","source":"## Assessing our Model Performance\n<blockquote> Making predicitons on the test set, evaluating performance, and logging this to MLFlow <footer>Splice Machine</footer>\n</blockquote>"},{"metadata":{"trusted":true,"scrolled":true},"cell_type":"code","source":"#Inference\npredictions = fitted_model.transform(test)\n\n#Performance Evaluation\nbinary_evaluator = SpliceBinaryClassificationEvaluator(spark, labelCol = \"CLASS_RESULT\")\nbinary_evaluator.input(predictions)\nperformance_metrics = binary_evaluator.get_results(as_dict = True)\n\n#Logging Performance\nmlflow.log_metrics(performance_metrics)","execution_count":29,"outputs":[{"output_type":"stream","text":"Current areaUnderROC: 0.8835821074497563\nCurrent areaUnderPR: 0.7292677480397618\n+-----+----+-------+\n| |True| False|\n+-----+----+-------+\n| True|33.0| 7.0|\n|False|10.0|25205.0|\n+-----+----+-------+\n\n","name":"stdout"}]},{"metadata":{},"cell_type":"markdown","source":"## Logging Artifacts of this Run\n<blockquote> We can store the notebook associated with a particular run as well as the fitted model created by this run <footer>Splice Machine</footer>\n</blockquote>"},{"metadata":{"trusted":true},"cell_type":"code","source":"# Store the notebook for easy retrieval\nmlflow.log_artifact('7.5 Model Creation.ipynb', 'training_notebook')\n#Log the best model\nmlflow.log_model(fitted_model, 'rf_model')","execution_count":30,"outputs":[{"output_type":"stream","text":"Saving artifact of size: 18.99 KB to Splice Machine DB\nSaving artifact of size: 80.927 KB to Splice Machine DB\n","name":"stdout"}]},{"metadata":{},"cell_type":"markdown","source":"## Finish our run\n<blockquote>Now we'll end our run, and view the results in the <a href=\"/mlflow\">MLFlow UI</a>. We can look at our different runs, the parameters, metrics, tags and artifacts logged, and download our notebook directly. You'll know the run is complete fom the small green check mark on the leftmost side of the run</blockquote>"},{"metadata":{"trusted":true},"cell_type":"code","source":"mlflow.end_run()","execution_count":31,"outputs":[]},{"metadata":{},"cell_type":"markdown","source":"# Fantastic!\n<blockquote> \nThis basically shows how our platform can be used to train and evaluate machine learning models! <br>\n Next Up: <a href='./7.6 Data Exploration.ipynb'>Using MLManager to Deploy Machine Learning Models</a>\n<footer>Splice Machine</footer>\n</blockquote>"},{"metadata":{"trusted":true},"cell_type":"code","source":"","execution_count":null,"outputs":[]}],"metadata":{"kernelspec":{"name":"python3","display_name":"Python 3","language":"python"},"language_info":{"name":"python","version":"3.7.6","mimetype":"text/x-python","codemirror_mode":{"name":"ipython","version":3},"pygments_lexer":"ipython3","nbconvert_exporter":"python","file_extension":".py"},"toc":{"nav_menu":{},"number_sections":false,"sideBar":true,"skip_h1_title":false,"base_numbering":1,"title_cell":"Table of Contents","title_sidebar":"Contents","toc_cell":false,"toc_position":{"height":"calc(100% - 180px)","width":"220px","left":"10px","top":"150px"},"toc_section_display":true,"toc_window_display":true}},"nbformat":4,"nbformat_minor":4}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment