Skip to content

Instantly share code, notes, and snippets.

@zeryx
Last active September 28, 2020 21:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zeryx/eab05cc923ef034dfbb20c2171cc6497 to your computer and use it in GitHub Desktop.
Save zeryx/eab05cc923ef034dfbb20c2171cc6497 to your computer and use it in GitHub Desktop.

Mleap + Algorithmia: When to leave your spark pipeline behind for scalable deployment

Intro

Spark is a very powerful big data processing system thats capable of insane workloads. Sometimes though, there are critical paths that don't scale as effectively as you might want. In this blog post, we'll be discussing Spark, Spark Pipelines - and how you might be able to export a critical component from your spark project to Algorithmia by using the MLeap model interchange format & runtime.

What makes Spark great?

Apache Spark is at it's core a distributed data transformation engine for very large datasets and workloads. It links directly with very powerful and battle tested distributed data systems like Hadoop and Cassandra which are industry standard for working in spaces such as the financial industry.

Spark allows for processing huge amounts of information across multiple machines in either a stream, or as a batch, enabling the parallelization of workflows that would have otherwise been very difficult. This is great, however in the world of Big Data and Data Mining, there is one thing that vanilla spark can't do: Machine Learning.

What are Spark Pipelines

This is where Spark Pipelines comes into play. Pipelines allow you to have a Spark data transformer that can learn, just like a neural network or a linear regression model. During the training process, not only do you provide the inputs for your transformation, but you then "fit" your model (essentially training in ML parlance). Once your model is fitted, it's ready for testing and finally real world workloads.

// Prepare training data from a list of (label, features) tuples.
val training = spark.createDataFrame(Seq(
  (1.0, Vectors.dense(0.0, 1.1, 0.1)),
  (0.0, Vectors.dense(2.0, 1.0, -1.0)),
  (0.0, Vectors.dense(2.0, 1.3, 1.0)),
  (1.0, Vectors.dense(0.0, 1.2, -0.5))
)).toDF("label", "features")

// Create a LogisticRegression instance. This instance is an Estimator.
val lr = new LogisticRegression()
// Print out the parameters, documentation, and any default values.
println(s"LogisticRegression parameters:\n ${lr.explainParams()}\n")

// We may set parameters using setter methods.
lr.setMaxIter(10)
  .setRegParam(0.01)

// Learn a LogisticRegression model. This uses the parameters stored in lr.
val model1 = lr.fit(training)
// Since model1 is a Model (i.e., a Transformer produced by an Estimator),
// we can view the parameters it used during fit().
// This prints the parameter (name: value) pairs, where names are unique IDs for this
// LogisticRegression instance.
println(s"Model 1 was fit using parameters: ${model1.parent.extractParamMap}")

Why you might want to export your Spark Pipeline model outside of Spark

Pipelines are pretty great, they allow you to train models to be used directly inside of your spark environment - but what if you if you decide later that the model you trained might be better used outside of Spark? Spark is a fantastic tool, but it's heavy; sometimes that heavyweight capability is overkill for a secondary project. Sometimes your workflow is running in spark, but due to the JVM or other secondary effects your spark jobs are starting to run slowly or aren't reliably completing jobs within an acceptable threshold. In these instances you can certainly try to finetune your spark cluster which can help, but there are tools out there that might be better suited for the task. If only you could pull your spark pipeline out of spark and be able to leverage those tools to the fullest...

MLeap!

MLeap is both a ML runtime framework and model interchange format that is compatible with a variety of frameworks, it supports Tensorflow, Scikit-learn and Spark.

This is great, because it allows us a mechanism for exporting a spark pipeline model into a non-spark environment. Spark is pretty heavy, and it can be quite difficult to run in a microservice based platform like Heroku, Seldon or Algorithmia.

Using MLeap to export spark pipelines

As mentioned, MLeap is a runtime environment, but its also an ML interchange format. This means that it has it's own internal representation of what your spark pipeline will look like, which is called a bundle. Lets look at how we can create one of these bundles, using a spark pipeline we created

import ml.combust.bundle.BundleFile
import ml.combust.mleap.spark.SparkSupport._
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.bundle.SparkBundleContext
import org.apache.spark.ml.feature.{Binarizer, StringIndexer}
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import resource._

  val datasetName = "./examples/spark-demo.csv"

  val dataframe: DataFrame = spark.sqlContext.read.format("csv")
    .option("header", true)
    .load(datasetName)
    .withColumn("test_double", col("test_double").cast("double"))

  // User out-of-the-box Spark transformers like you normally would
  val stringIndexer = new StringIndexer().
    setInputCol("test_string").
    setOutputCol("test_index")
  
  val binarizer = new Binarizer().
    setThreshold(0.5).
    setInputCol("test_double").
    setOutputCol("test_bin")
 
  val pipelineEstimator = new Pipeline()
    .setStages(Array(stringIndexer, binarizer))
  // now that our pipeline is created, we can fit it to the example data.
  val pipeline = pipelineEstimator.fit(dataframe)
  
  // -- pre-built pipeline entrypoint -- //
  // If you already have a spark pipeline that you've trained on your data, you can skip the previous
  
  // Pipeline serialization
  // As you can see, not only are we serializing the pipeline, we're serializing sample data along with it.
  // This is to ensure that the MLeap bundle not only has a representation of the model, but also the expected input and output structures to help lock down potentially dynamic model graphs.
  val sbc = SparkBundleContext().withDataset(pipeline.transform(dataframe))
  for(bf <- managed(BundleFile("jar:file:/tmp/simple-spark-pipeline.zip"))) {
    pipeline.writeBundle.save(bf)(sbc).get
  }

You can find the example data used during the model training/instancing here.

This scala code, if run in a spark REPL or a scala project connected to a running spark node, will both create the spark pipeline and serialize it along with it's IO schema, into a file located here on your system /tmp/simple-spark-pipeline.zip (assuming you're on a unix based machine). This is that MLeap bundle we've been talking about, and it's ready to be injested by a program leveraging the MLeap runtime environment, spark is no longer required!

How to run your exported model on Algorithmia

Now that you have an MLeap bundle containing your Spark Pipeline transformer, weights, and the expected IO for the model - we can export it to Algorithmia. To start, you'll need to create a new Scala algorithm. On the algorithm creation wizard, you'll want to select the other dropdown, and then select Scala 2.x & sbt 1.3.x (Environments). Then Select Scala 2.X as the Environment in the dropdown. With that, click the create a new Algorithm.

With that done, now we can get started with updating the Algorithm code itself, we'll need to import the mleap dependencies, and also change the version of scala that we're using from scala 2.11.8 to 2.11.14. We can do this by clicking the DEPENDENCIES button in the web editor which you can access by clicking "view source".

// Enter your dependencies here, using SBT syntax. Maven Central is available.
// http://search.maven.org/

// Examples:

// libraryDependencies += "org.apache.commons" % "commons-math3" % "3.4.1"

// libraryDependencies += "org.apache.commons" % "commons-csv" % "1.1"

libraryDependencies += "ml.combust.mleap" %% "mleap-runtime" % "0.16.0"
libraryDependencies += "ml.combust.mleap" %% "mleap-tensor" % "0.16.0"

scalaVersion := "2.11.12"

Save this, and our dependencies are set.

Next we'll need the actual Algorithm code. For this we recommend adding the InputExample class, and updates to the Algorithm class as you can see in the gist below: https://gist.github.com/zeryx/4a14d4deb4d8dc2e7f8d1ee01c74bfd0

The model in the above code snippet is also publicly available, so you can use that in your own example for demo purposes. The easiest way to get started will be to just copy/paste the Algorithm.scala code into your main file, and then create another file called InputExample.scala, and copying over that file as well.

Finally, if you want to see this fully functioning algorithm running on our platform, can check out the algorithm here.

Conclusion

Spark is a pretty potent framework, and there's a lot of value with using it for heavy enterprise scale workloads; however if you're running into issues with your spark pipeline model in regards to scalability and overall reliable performance, you don't have to be locked in. MLeap is a framework and interchange format that allows you to break away from the spark lock-in and run your models in other locations, like Algorithmia.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment