Skip to content

Instantly share code, notes, and snippets.

@jaceklaskowski
Last active June 14, 2017 21:47
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jaceklaskowski/560bea4edaf7b8634d982b4ebe2a8094 to your computer and use it in GitHub Desktop.
Save jaceklaskowski/560bea4edaf7b8634d982b4ebe2a8094 to your computer and use it in GitHub Desktop.
notes

IDEA:

  • breaks on demand given the number of exercises
  • break man who says we should have one
val wholeJsonRDD = sc.wholeTextFiles("input.json").map(_._2)
val mySchema = new StructType().add($"n".int)
wholeJsonRDD.toDF.withColumn("json", from_json($"value", mySchema)).show(truncate = false)

val jsonDF=spark.read.json("output.json")
jsonDF.printSchema

scala> jsonDF.printSchema
root
 |-- orderId: string (nullable = true)
 |-- orderLines: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- productId: string (nullable = true)
 |    |    |-- quantity: long (nullable = true)
 |    |    |-- sequence: long (nullable = true)
 |    |    |-- totalPrice: struct (nullable = true)
 |    |    |    |-- gross: long (nullable = true)
 |    |    |    |-- net: long (nullable = true)
 |    |    |    |-- tax: long (nullable = true)


scala> jsonDF.withColumn("quantity", explode($"orderLines.quantity")).groupBy("orderId").agg(sum("quantity")).show
+-------+-------------+
|orderId|sum(quantity)|
+-------+-------------+
|    oi1|            4|
+-------+-------------+

scala> jsonDF.withColumn("quantity", explode($"orderLines.quantity")).show(false)
+-------+--------------------------------------------+--------+
|orderId|orderLines                                  |quantity|
+-------+--------------------------------------------+--------+
|oi1    |[[p1,1,1,[50,40,10]], [p2,3,2,[300,240,60]]]|1       |
|oi1    |[[p1,1,1,[50,40,10]], [p2,3,2,[300,240,60]]]|3       |
+-------+--------------------------------------------+--------+

scala> jsonDF.withColumn("quantity", explode($"orderLines.quantity")).show(false)
+-------+--------------------------------------------+--------+
|orderId|orderLines                                  |quantity|
+-------+--------------------------------------------+--------+
|oi1    |[[p1,1,1,[50,40,10]], [p2,3,2,[300,240,60]]]|1       |
|oi1    |[[p1,1,1,[50,40,10]], [p2,3,2,[300,240,60]]]|3       |
+-------+--------------------------------------------+--------+

scala> jsonDF.select("orderLines.quantity").show(false)
+--------+
|quantity|
+--------+
|[1, 3]  |
+--------+

scala> jsonDF.withColumn("quantity", explode($"orderLines.quantity")).groupBy("orderId").agg(sum("quantity")).explain
== Physical Plan ==
*HashAggregate(keys=[orderId#0], functions=[sum(quantity#137L)])
+- Exchange hashpartitioning(orderId#0, 200)
   +- *HashAggregate(keys=[orderId#0], functions=[partial_sum(quantity#137L)])
      +- *Project [orderId#0, quantity#137L]
         +- Generate explode(orderLines#1.quantity), true, false, [quantity#137L]
            +- *FileScan json [orderId#0,orderLines#1] Batched: false, Format: JSON, Location: InMemoryFileIndex[file:/home/TC/Spark/spark-2.1.1-bin-hadoop2.6/output.json], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<orderId:string,orderLines:array<struct<productId:string,quantity:bigint,sequence:bigint,to...
val jsonDF=spark.read.json("output.json")
jsonDF.
  as[(String, Seq[(String, Long, Long, (Long, Long, Long))])].
  flatMap { case (orderId, orderLines) => Seq((orderId, 0), (orderId, 1)) }.
  toDF("orderId", "num").
  show

scala> jsonDF.select("orderId", "orderLines.quantity").as[(String, Seq[Long])].flatMap { case (oid, qs) => qs.map((oid, _)) }.show
+---+---+
| _1| _2|
+---+---+
|oi1|  1|
|oi1|  3|
+---+---+

scala> jsonDF.select("orderId", "orderLines.quantity").as[(String, Seq[Long])].flatMap { case (oid, qs) => qs.map((oid, _)) }.toDF("orderId", "quantity").groupBy("orderId").agg(sum("quantity") as "sum").show
+-------+---+
|orderId|sum|
+-------+---+
|    oi1|  4|
+-------+---+

Can we avoid groupBy? YES!

scala> jsonDF.select("orderId", "orderLines.quantity").as[(String, Seq[Long])].map { case (oid, qs) => (oid, qs.sum) }.toDF("orderId", "sum").show
+-------+---+
|orderId|sum|
+-------+---+
|    oi1|  4|
+-------+---+

The physical plan is better. Much better!

scala> jsonDF.select("orderId", "orderLines.quantity").as[(String, Seq[Long])].map(t => (t._1, t._2.sum)).toDF("orderId", "sum").show
+-------+---+
|orderId|sum|
+-------+---+
|    oi1|  4|
+-------+---+

Can we avoid serde using map? YES!

scala> jsonDF.select("orderId", "orderLines.quantity").as[(String, Seq[Long])].map { case (oid, qs) => (oid, qs.sum) }.toDF("orderId", "sum").show
+-------+---+
|orderId|sum|
+-------+---+
|    oi1|  4|
+-------+---+

scala> jsonDF.select("orderId", "orderLines.quantity").map { r => (r.getString(0), r.getSeq[Long](1).sum) }.toDF("orderId", "sum").show
+-------+---+
|orderId|sum|
+-------+---+
|    oi1|  4|
+-------+---+

case class MyRecord(orderId: String, quantities: Seq[Long])
scala> jsonDF.select($"orderId", $"orderLines.quantity" as "quantities").as[MyRecord].map { mr => (mr.orderId, mr.quantities.sum) }.toDF("orderId", "sum").show
+-------+---+
|orderId|sum|
+-------+---+
|    oi1|  4|
+-------+---+
scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row

scala> jsonDF.select("orderId", "orderLines.quantity").map { case Row(oid: String, qs: Seq[Long]) => (oid, qs.sum) }.toDF("orderId", "sum").show
<console>:29: warning: non-variable type argument Long in type pattern Seq[Long] (the underlying of Seq[Long]) is unchecked since it is eliminated by erasure
       jsonDF.select("orderId", "orderLines.quantity").map { case Row(oid: String, qs: Seq[Long]) => (oid, qs.sum) }.toDF("orderId", "sum").show
                                                                                       ^
+-------+---+
|orderId|sum|
+-------+---+
|    oi1|  4|
+-------+---+
case class MyRecord(orderId: String, quantities: Seq[Long])
scala> jsonDF.select($"orderId", $"orderLines.quantity" as "quantities").as[MyRecord].map { mr => (mr.orderId, mr.quantities.sum) }.toDF("orderId", "sum").show
+-------+---+
|orderId|sum|
+-------+---+
|    oi1|  4|
+-------+---+
import org.apache.spark.sql.Row
scala> jsonDF.select("orderId", "orderLines.quantity").map { case Row(oid: String, qs: Seq[Long]) => (oid, qs.sum) }.toDF("orderId", "sum").show
<console>:29: warning: non-variable type argument Long in type pattern Seq[Long] (the underlying of Seq[Long]) is unchecked since it is eliminated by erasure
       jsonDF.select("orderId", "orderLines.quantity").map { case Row(oid: String, qs: Seq[Long]) => (oid, qs.sum) }.toDF("orderId", "sum").show
                                                                                       ^
+-------+---+
|orderId|sum|
+-------+---+
|    oi1|  4|
+-------+---+
scala> val mysum = udf { (ns: Seq[Long]) => ns.sum }
mysum: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,LongType,Some(List(ArrayType(LongType,false))))

scala> jsonDF.select($"orderId", mysum($"orderLines.quantity") as "sum").show
+-------+---+
|orderId|sum|
+-------+---+
|    oi1|  4|
+-------+---+
scala> val _sum = (ns: Seq[Long]) => ns.sum
_sum: Seq[Long] => Long = <function1>

scala> val myudf = spark.udf.register("myudf", _sum)
myudf: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,LongType,Some(List(ArrayType(LongType,false))))

scala> sql("select mysum(col1) from values((Array(1L,2L,3L),2,3))").show
+---------------+
|UDF:mysum(col1)|
+---------------+
+---------------+
|              6|
+---------------+
scala> solution.toDF("orderId", "quantity").groupBy("orderId").agg(sum("quantity") as "sum").show
+-------+---+
|orderId|sum|
+-------+---+
|    oi1|  4|
+-------+---+

scala> val records = jsonDF.select("orderId", "orderLines.quantity").as[(String, Seq[Long])]
records: org.apache.spark.sql.Dataset[(String, Seq[Long])] = [orderId: string, quantity: array<bigint>]

scala> val solution = for {
     |   (oid, qs) <- records
     |   q <- qs
     | } yield (oid, q)
warning: there was one deprecation warning; re-run with -deprecation for details
solution: org.apache.spark.sql.Dataset[(String, Long)] = [_1: string, _2: bigint]
import org.apache.spark.sql.types._
val jsonSchema = new StructType().
  add($"orderId".string).
  add($"orderLines".array(
    new StructType().
      add($"productId".string).
      add($"quantity".long).
      add($"sequence".long).
      add($"totalPrice".struct(
        $"gross".long,
        $"net".long,
        $"tax".long
      ))
  ))
scala> jsons.withColumn("json", from_json($"json", jsonSchema)).show
+--------------------+
|                json|
+--------------------+
|[oi1,WrappedArray...|
+--------------------+
case class TotalPrice(gross: Long, net: Long, tax: Long)
case class OrderLine(
  productId: String,
  quantity: Long,
  sequence: Long,
  totalPrice: TotalPrice)
case class JsonSchema(
  orderId: String,
  orderLines: Seq[OrderLine])
import org.apache.spark.sql.Encoders
val jsonSchema = Encoders.product[JsonSchema].schema

scala> jsons.withColumn("json", from_json($"json", jsonSchema)).show
+--------------------+
|                json|
+--------------------+
|[oi1,WrappedArray...|
+--------------------+

Report an issue in 2.1.1

scala> val people = spark.read.option("header", true).csv("client-data/*.csv")
org.apache.spark.sql.AnalysisException: Path does not exist: file:/home/TC/Spark/spark-2.1.1-bin-hadoop2.6/client-data/*.csv;
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:377)
  at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$14.apply(DataSource.scala:370)
  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
  at scala.collection.immutable.List.flatMap(List.scala:344)
  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:370)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:152)
  at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:415)
  at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:352)
  ... 54 elided

should be requesting schema for empty dataset

scala> val peopleSchema = spark.read.option("header", true).option("inferSchema", true).csv("client-data/people.csv").schema
peopleSchema: org.apache.spark.sql.types.StructType = StructType(StructField(id,IntegerType,true), StructField(name,StringType,true), StructField(city,StringType,true))

scala> peopleSchema.printTreeString
root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- city: string (nullable = true)
scala> import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.expressions.Window

scala> val idGroups = Window.partitionBy($"id" % 2)
idGroups: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@47c16b3b

scala> input.withColumn("count", count("id") over idGroups).show
+---+-----+
| id|count|
+---+-----+
|  0|    2|
|  2|    2|
|  1|    2|
|  3|    2|
+---+-----+
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment