Skip to content

Instantly share code, notes, and snippets.

@pavlov99
Last active May 13, 2016 03:38
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pavlov99/25a5cab2ab0199a27300b070cb1e02c7 to your computer and use it in GitHub Desktop.
Save pavlov99/25a5cab2ab0199a27300b070cb1e02c7 to your computer and use it in GitHub Desktop.
Apache Spark in data science presentation

This gist consists of Spark presentation examples.

// Initial data
val colors = sc.parallelize(Array(
("FFFFFF"),
("000000"),
("123456")
)).toDF("color")
// Function in Scala
def hex2rgb(s: String): (Int, Int, Int) = {
val hex = Integer.parseInt(s, 16)
val r = (hex & 0xFF0000) >> 16
val g = (hex & 0xFF00) >> 8
val b = (hex & 0xFF)
return (r, g, b)
}
// Register UDF function
val hex2rgbUDF = sqlContext.udf
.register("hex2rgb", (s: String) => hex2rgb(s))
colors
.withColumn("rgb", hex2rgbUDF($"color"))
.show()
import org.apache.spark.sql.expressions.Window
val win1 = Window.partitionBy("name").orderBy("endDate")
val win2 = Window.partitionBy("name").orderBy("endDate").rowsBetween(Long.MinValue, 0)
products
.withColumn("monthsFromLastUpdate", months_between($"endDate", lag("endDate", 1).over(win1)))
.withColumn("origPriceUplift", $"price" - first($"price").over(win2))
.show()
val products = sc.parallelize(Array(
("steak", "1990-01-01", "2000-01-01", 150),
("steak", "2000-01-02", "2010-01-01", 180),
("steak", "2010-01-02", "2020-01-01", 200),
("fish", "1990-01-01", "2020-01-01", 100)
)).toDF("name", "startDate", "endDate", "price")
val orders = sc.parallelize(Array(
("1995-01-01", "steak"),
("2000-01-01", "fish"),
("2005-01-01", "steak"),
("2010-01-01", "fish"),
("2015-01-01", "steak")
)).toDF("date", "product")
orders
.join(products, $"product" === $"name" && $"date" >= $"startDate" && $"date" <= $"endDate")
.show()
val data = sc.parallelize(Array(
("M", "EN", 1.0),
("M", "ES", 0.0),
("F", "EN", 1.0),
("F", "ZH", 0.1)
)).toDF("gender", "language", "label")
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer, VectorAssembler}
// Define indexers and encoders
val fieldsToIndex = Array("gender", "language")
val indexers = fieldsToIndex.map(f => new StringIndexer()
.setInputCol(f).setOutputCol(f + "_index"))
val fieldsToEncode = Array("gender", "language")
val oneHotEncoders = fieldsToEncode.map(f => new OneHotEncoder()
.setInputCol(f + "_index").setOutputCol(f + "_flags"))
val featureAssembler = new VectorAssembler()
.setInputCols(Array("gender_flags", "language_flags"))
.setOutputCol("features")
// Combine stages into pipeline
val pipeline = new Pipeline().setStages(indexers ++ oneHotEncoders :+ featureAssembler)
pipeline
.fit(data)
.transform(data)
.drop("gender_flags")
.drop("language_flags")
.show()
import sqlContext.implicits._
// https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data
// Data From UCI repository. Please refer to link below.
val url = "https://archive.ics.uci.edu/ml/machine-learning-databases/iris/iris.data"
val data = sc.parallelize(
scala.io.Source
.fromURL(url)
.mkString
.split("\n")
.map(_.split(","))
.map({case Array(f1,f2,f3,f4,label) => (f1.toDouble, f2.toDouble, f3.toDouble, f4.toDouble, label)})
)
.toDF("sepal_length", "sepal_width", "petal_length", "petal_width", "label")
.filter($"label".isin("Iris-versicolor", "Iris-virginica"))
.withColumn("label", when($"label" === "Iris-versicolor", 0.0).otherwise(1.0))
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.tuning.{ParamGridBuilder, CrossValidator}
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
val Array(training, test) = data.randomSplit(Array(0.9, 0.1), seed = 12345)
val featureAssembler = new VectorAssembler()
.setInputCols(Array("sepal_length", "sepal_width", "petal_length", "petal_width"))
.setOutputCol("features")
val lr = new LogisticRegression()
.setMaxIter(10)
val fullPipeline = new Pipeline().setStages(Array(featureAssembler, lr))
val paramGrid = new ParamGridBuilder()
.addGrid(lr.regParam, Array(0.1, 0.01))
.build()
val cv = new CrossValidator()
.setEstimator(fullPipeline)
.setEvaluator(new BinaryClassificationEvaluator)
.setEstimatorParamMaps(paramGrid)
.setNumFolds(5)
val cvModel = cv.fit(training)
cvModel.avgMetrics
cvModel.transform(test).show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment