Skip to content

Instantly share code, notes, and snippets.

@pschatzmann
Created November 2, 2018 13:10
Show Gist options
  • Save pschatzmann/2f41d79cec9b5fbaf7046e6316651b9a to your computer and use it in GitHub Desktop.
Save pschatzmann/2f41d79cec9b5fbaf7046e6316651b9a to your computer and use it in GitHub Desktop.
Display the source blob
Display the rendered blob
Raw
{"metadata":{"kernelspec":{"display_name":"Scala","language":"scala","name":"scala"},"language_info":{"codemirror_mode":"text/x-scala","file_extension":".scala","mimetype":"","name":"Scala","nbconverter_exporter":"","version":"2.11.12"}},"nbformat_minor":2,"nbformat":4,"cells":[{"cell_type":"markdown","source":"# Machine Learning Data Conversions Accross Frameworks\n\nEach machine learning framework is using it's own data model to represent features and labels. For all of them it is possible to use raw arrays of doubles to initialize the data.\n\nIn this document we demonstrate how to convert the data between the different models.\n- DL4J\n- Spark MLib \n- Shogun\n\nThese examples are implemented in Scala using the [BeakerX](http://beakerx.com/) Jupyter kernel.\n\n## Setup\nWe load all the necessary dependencies with the help of Maven","metadata":{}},{"cell_type":"code","source":"%%classpath add mvn \norg.deeplearning4j:deeplearning4j-core:1.0.0-beta2\norg.nd4j:nd4j-native-platform:1.0.0-beta2\norg.apache.spark:spark-sql_2.11:2.3.2\norg.apache.spark:spark-mllib_2.11:2.3.2\norg.shogun:shogun-jvm:0.0.1-SNAPSHOT","metadata":{"trusted":true},"execution_count":133,"outputs":[{"output_type":"display_data","data":{"application/vnd.jupyter.widget-view+json":{"model_id":"","version_major":2,"version_minor":0},"method":"display_data"},"metadata":{}}]},{"cell_type":"markdown","source":"We initialize Shogun and Maven. Nothing needs to be done for DL4J","metadata":{}},{"cell_type":"code","source":"import org.apache.spark.sql.SparkSession\nimport org.shogun.ShogunJVM\n\n// setup Shogun\nShogunJVM.load()\n\n// setup Spark\nval spark = SparkSession.builder()\n .appName(\"Conversions\")\n .master(\"local\")\n .config(\"spark.ui.enabled\", \"false\")\n .getOrCreate()","metadata":{"trusted":true},"execution_count":59,"outputs":[{"execution_count":59,"output_type":"execute_result","data":{"text/plain":"org.apache.spark.sql.SparkSession@38c711ca"},"metadata":{}}]},{"cell_type":"markdown","source":"We load some initial Data with the help of the DL4J IrisDataSetIterator. We load all data (150 record) into one Dataset.","metadata":{}},{"cell_type":"code","source":"import org.deeplearning4j.datasets.iterator.impl.IrisDataSetIterator;\n\nvar size = 150;\nvar iter = new IrisDataSetIterator(150, 150);\nvar irisDataSet = iter.next();\n\n\"Features: \" + irisDataSet.numInputs() +\" / Examples: \"+irisDataSet.numExamples()\n","metadata":{"trusted":true},"execution_count":132,"outputs":[{"execution_count":132,"output_type":"execute_result","data":{"text/plain":"Features: 4 / Examples: 150"},"metadata":{}}]},{"cell_type":"code","source":"irisDataSet.getClass()","metadata":{"trusted":true},"execution_count":116,"outputs":[{"execution_count":116,"output_type":"execute_result","data":{"text/plain":"class org.nd4j.linalg.dataset.DataSet"},"metadata":{}}]},{"cell_type":"markdown","source":"## Convet DL4J Dataset to Spark Dataframe","metadata":{}},{"cell_type":"markdown","source":"We can parallelize the DataSet by converting it to a Seq. A call to getLabels() and getFeatures() gives a INDArray which we need to convert into a vector of doubles.\n\nThe labels are one hot encoded. We translate the value back to the index position.\nThe features need to be created as Vectors.dense()","metadata":{}},{"cell_type":"code","source":"import spark.implicits._\nimport scala.collection.JavaConversions._\nimport org.apache.spark.ml.linalg.Vectors\n\nvar sparkData = spark.sparkContext\n .parallelize(irisDataSet.asList)\n .map(row => (row.getLabels().toDoubleVector().indexOf(1.0), Vectors.dense(row.getFeatures().toDoubleVector())))\n .toDF(\"label\",\"features\")\n\nsparkData.printSchema\nsparkData.show","metadata":{"trusted":true},"execution_count":131,"outputs":[{"name":"stdout","output_type":"stream","text":"root\n |-- label: integer (nullable = false)\n |-- features: vector (nullable = true)\n\n+-----+--------------------+\n|label| features|\n+-----+--------------------+\n| 0|[5.09999990463256...|\n| 0|[4.90000009536743...|\n| 0|[4.69999980926513...|\n| 0|[4.59999990463256...|\n| 0|[5.0,3.5999999046...|\n| 0|[5.40000009536743...|\n| 0|[4.59999990463256...|\n| 0|[5.0,3.4000000953...|\n| 0|[4.40000009536743...|\n| 0|[4.90000009536743...|\n| 0|[5.40000009536743...|\n| 0|[4.80000019073486...|\n| 0|[4.80000019073486...|\n| 0|[4.30000019073486...|\n| 0|[5.80000019073486...|\n| 0|[5.69999980926513...|\n| 0|[5.40000009536743...|\n| 0|[5.09999990463256...|\n| 0|[5.69999980926513...|\n| 0|[5.09999990463256...|\n+-----+--------------------+\nonly showing top 20 rows\n\n"},{"execution_count":131,"output_type":"execute_result","data":{"text/plain":"org.apache.spark.sql.SparkSession$implicits$@71e228cd"},"metadata":{}}]},{"cell_type":"markdown","source":"We can also start to searialize using the Iterator in order to load the data in Batches","metadata":{}},{"cell_type":"code","source":"var iter = new IrisDataSetIterator(10, 150);\nvar sparkData = spark.sparkContext\n .parallelize(iter.toSeq)\n .flatMap(rec => rec)\n .map(row => (row.getLabels().toDoubleVector().indexOf(1.0), Vectors.dense(row.getFeatures().toDoubleVector()))) \n .toDF(\"label\",\"features\")\n\nsparkData.count","metadata":{"trusted":true},"execution_count":123,"outputs":[{"execution_count":123,"output_type":"execute_result","data":{"text/plain":"150"},"metadata":{}}]},{"cell_type":"markdown","source":"## Convert DL4J Dataset to Shogun Features and Labels\n\nDL4J is providing the data as DoubleMatrix. The labels are one-hot-encoded. We need to convert the encoding back to an Array of a single integer. \nThe matrix of Doubles is used for the creation of DoubleMatrix objects.","metadata":{}},{"cell_type":"code","source":"import org.jblas._\nimport org.shogun._\n\nvar labelsAsDoubles = irisDataSet.getLabels().toDoubleMatrix().map(r => Array(r.indexOf(1.0).toDouble)).toArray\nvar shogunLabels = new MulticlassLabels(new DoubleMatrix(labelsAsDoubles).transpose)\nvar shogunFeatures = new RealFeatures(new DoubleMatrix(irisDataSet.getFeatures().toDoubleMatrix()).transpose)\n\nshogunFeatures.get_num_features+\" / \" +shogunFeatures.get_num_vectors + \" / \" + shogunLabels.get_num_labels","metadata":{"trusted":true},"execution_count":129,"outputs":[{"execution_count":129,"output_type":"execute_result","data":{"text/plain":"4 / 150 / 150"},"metadata":{}}]},{"cell_type":"markdown","source":"## Convert Spark Dataframe to DL4J Dataset ","metadata":{}},{"cell_type":"code","source":"import org.nd4j.linalg.dataset.DataSet\nimport org.nd4j.linalg.factory.Nd4j\nimport scala.reflect.ClassTag\nimport org.apache.spark.ml.linalg.DenseVector;\nimplicit def kryoEncoder[A](implicit ct: ClassTag[A]) = org.apache.spark.sql.Encoders.kryo[A](ct)\n\nvar sparkFeatures = sparkData.select(\"features\").map(row => row.get(0).asInstanceOf[DenseVector].values).collect.toArray\nvar sparkLabels = sparkData.select(\"label\").map(row => Array(row.get(0).asInstanceOf[Integer].toDouble)).collect.toArray\n\nvar labels = Nd4j.create(sparkLabels)\nvar features = Nd4j.create(sparkFeatures)\n\nvar dataset = new DataSet(features, labels)\n\ndataset.size","metadata":{"trusted":true},"execution_count":65,"outputs":[{"execution_count":65,"output_type":"execute_result","data":{"text/plain":"150"},"metadata":{}}]},{"cell_type":"markdown","source":"## Convert Spark Dataframe to Shogun Features and Labels\n\nWe just need to convert the Spark Dataframe to an Array of Double values. From this we can create the\nDoubleMatrix which is used to create the Labels and Features. The DoubleMatrix needs to be transposed to give the correct dimensions.\n","metadata":{}},{"cell_type":"code","source":"import org.jblas._\nimport org.shogun._\n\nvar sparkFeaturesArray = sparkData.select(\"features\").map(row => row.get(0).asInstanceOf[DenseVector].values).collect.toArray\nvar sparkLabelsArray = sparkData.select(\"label\").map(row => Array(row.getInt(0).toDouble)).collect.toArray\n\nvar shogunFeatures = new RealFeatures(new DoubleMatrix(sparkFeaturesArray).transpose)\nvar shogunLabels = new MulticlassLabels(new DoubleMatrix(sparkLabelsArray).transpose)\n\nshogunFeatures.get_num_features+\" / \" +shogunFeatures.get_num_vectors + \" / \" + shogunLabels.get_num_labels","metadata":{"trusted":true},"execution_count":128,"outputs":[{"execution_count":128,"output_type":"execute_result","data":{"text/plain":"4 / 150 / 150"},"metadata":{}}]},{"cell_type":"markdown","source":"## Convert Shogun Features and Labels to DL4J Dataset\n","metadata":{}},{"cell_type":"markdown","source":"We convert the label vector into a 2D array. The feature matrix can be used directory to create NDArray Objects","metadata":{}},{"cell_type":"code","source":"var label2DArray = shogunLabels.get_labels.toArray.map(value => Array(value))\nvar labels = Nd4j.create(label2DArray)\nvar features = Nd4j.create(shogunFeatures.get_feature_matrix().transpose().toArray2())\n\nvar dataset = new DataSet(features, labels)\n\ndataset.size","metadata":{"trusted":true},"execution_count":67,"outputs":[{"execution_count":67,"output_type":"execute_result","data":{"text/plain":"150"},"metadata":{}}]},{"cell_type":"markdown","source":"## Convert Shogun Features and Labels to Spark Dataframe\n\nWe concatenate the labels and features into a DoubleMatrix. Then we convert the rows into a collection of LabeledPoint.\nFinally we parallelize the collection and convert it to a dataframe.","metadata":{}},{"cell_type":"code","source":"import org.apache.spark.sql.types._\nimport org.apache.spark.ml.feature.LabeledPoint\nimport org.apache.spark.ml.linalg.Vectors\nimport spark.implicits._\n\nvar concatArray = DoubleMatrix.concatHorizontally(shogunLabels.get_labels().transpose, shogunFeatures.get_feature_matrix().transpose).toArray2()\nvar labeledPointArray = concatArray.map(row => LabeledPoint(row(0), Vectors.dense(row(1),row(2),row(3),row(4))))\nvar sparkData = spark.sparkContext\n .parallelize(labeledPointArray)\n .toDF()\n\nsparkData.printSchema\nsparkData.show","metadata":{"trusted":true},"execution_count":130,"outputs":[{"name":"stdout","output_type":"stream","text":"root\n |-- label: double (nullable = false)\n |-- features: vector (nullable = true)\n\n+-----+--------------------+\n|label| features|\n+-----+--------------------+\n| 0.0|[5.09999990463256...|\n| 0.0|[4.90000009536743...|\n| 0.0|[4.69999980926513...|\n| 0.0|[4.59999990463256...|\n| 0.0|[5.0,3.5999999046...|\n| 0.0|[5.40000009536743...|\n| 0.0|[4.59999990463256...|\n| 0.0|[5.0,3.4000000953...|\n| 0.0|[4.40000009536743...|\n| 0.0|[4.90000009536743...|\n| 0.0|[5.40000009536743...|\n| 0.0|[4.80000019073486...|\n| 0.0|[4.80000019073486...|\n| 0.0|[4.30000019073486...|\n| 0.0|[5.80000019073486...|\n| 0.0|[5.69999980926513...|\n| 0.0|[5.40000009536743...|\n| 0.0|[5.09999990463256...|\n| 0.0|[5.69999980926513...|\n| 0.0|[5.09999990463256...|\n+-----+--------------------+\nonly showing top 20 rows\n\n"},{"execution_count":130,"output_type":"execute_result","data":{"text/plain":"org.apache.spark.sql.SparkSession$implicits$@71e228cd"},"metadata":{}}]},{"cell_type":"code","source":"","metadata":{"trusted":true},"execution_count":null,"outputs":[]}]}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment