Skip to content

Instantly share code, notes, and snippets.

@johntbush
Created August 13, 2019 02:27
Show Gist options
  • Save johntbush/957adba3160d3e72f4c1b4c682824c47 to your computer and use it in GitHub Desktop.
Save johntbush/957adba3160d3e72f4c1b4c682824c47 to your computer and use it in GitHub Desktop.
scala beam sample
package example.scala
import com.spotify.scio._
import com.spotify.scio.extra.json._
case class Orders(orders: List[Order])
case class Order(order_id:String, customer_id:String, order_amt:Long)
case class Refunds(refunds: List[Refund])
case class Refund(refund_order_id:String, original_order_id:String, customer_id:String,
refund_amt:Long)
object OrderSum {
def main(cmdlineArgs: Array[String]): Unit = {
val (sc, args) = ContextAndArgs(Array())
val customerId = "a"
val orders = sc.jsonFile[Orders]("src/test/resources/data.json")
.flatMap(_.orders)
.filter(_.customer_id.equals(customerId))
.keyBy(_.order_id)
val refunds = sc.jsonFile[Refunds]("src/test/resources/data.json")
.flatMap(_.refunds)
.groupBy(_.original_order_id)
.map { case (key, refunds) =>
(key, refunds.foldLeft(0L)(_ + _.refund_amt))
}
orders.leftOuterJoin(refunds)
.map { case (_,(order, refundOpt)) =>
refundOpt.fold(order.order_amt)(_ + order.order_amt)
}
.reduce(_ + _)
.map { x =>
println(x.toString)
x.toString
}
val result = sc.close().waitUntilFinish()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment