Skip to content

Instantly share code, notes, and snippets.

@zeryx
Last active October 7, 2020 20:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save zeryx/5e9342727de844cb3a7537683bf70de6 to your computer and use it in GitHub Desktop.
Save zeryx/5e9342727de844cb3a7537683bf70de6 to your computer and use it in GitHub Desktop.
spark-shell code used to create an example spark pipeline, and serialize it mleap
import ml.combust.bundle.BundleFile
import ml.combust.mleap.spark.SparkSupport._
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.bundle.SparkBundleContext
import org.apache.spark.ml.feature.{Binarizer, StringIndexer}
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import resource._
val datasetName = "example-data.csv"
val dataframe: DataFrame = spark.sqlContext.read.format("csv").option("header", true).load(datasetName).withColumn("test_double", col("test_double").cast("double"))
// User out-of-the-box Spark transformers like you normally would
val stringIndexer = new StringIndexer().setInputCol("test_string").setOutputCol("test_index")
val binarizer = new Binarizer().setThreshold(0.5).setInputCol("test_double").setOutputCol("test_bin")
val pipelineEstimator = new Pipeline().setStages(Array(stringIndexer, binarizer))
val pipeline = pipelineEstimator.fit(dataframe)
// then serialize pipeline
val sbc = SparkBundleContext().withDataset(pipeline.transform(dataframe))
for(bf <- managed(BundleFile("jar:file:/tmp/simple-spark-pipeline.zip"))) {
pipeline.writeBundle.save(bf)(sbc).get
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment