Experiments beyond Java to create pipelines that are semantically more familiar to sql developers, functional programmers, and others with big data backgrounds.
The dream is we can make pipelines in less time and make them easier to read. This will bring value faster and lower our maintenance costs in the long run.
The best way to explain this is with an example. We take a simple made up model of orders and refunds. An order can have 0 to N refunds. A customer can have 0 to N orders. We want to total the amount a customer X has paid across all orders. Order amount total minus refund total.
So what does that look like in Java:
PCollection<KV<String, LineItem>> filteredOrders =
orders
.apply(
MapElements
.into(TypeDescriptor.of(LineItem.class))
.via((SerializableFunction<Order, LineItem>) order -> new LineItem(order.getOrder_id(),
order.getCustomer_id(), order.getOrder_amt())))
.apply(Filter.by((SerializableFunction<LineItem, Boolean>) c -> c.getCustomerId().equals("a")))
.apply(WithKeys.of(LineItem::getOrderId).withKeyType(TypeDescriptor.of(String.class)));
PCollection<KV<String, LineItem>> mappedRefunds =
refunds
.apply(MapElements.into(TypeDescriptor.of(LineItem.class))
.via((SerializableFunction<Refund, LineItem>) refund -> new LineItem(refund.getOriginal_order_id(),
refund.getCustomer_id(), refund.getRefund_amt())))
.apply(WithKeys.of(LineItem::getOrderId).withKeyType(TypeDescriptor.of(String.class)))
.apply(Combine.perKey((SerializableFunction<Iterable<LineItem>, LineItem>) input -> {
LineItem first = Iterables.get(input, 0);
String orderId = first.getOrderId();
String customerId = first.getCustomerId();
Long total =
StreamSupport
.stream(input.spliterator(), false)
.map((line1) -> line1.getAmount())
.reduce((line1, line2) -> line1 + line2)
.orElse(0L);
return new LineItem(orderId, customerId, total);
}));
final TupleTag<LineItem> t1 = new TupleTag<>();
final TupleTag<LineItem> t2 = new TupleTag<>();
PCollection<KV<String, CoGbkResult>> groupedResult =
KeyedPCollectionTuple.of(t1, filteredOrders)
.and(t2, mappedRefunds)
.apply(CoGroupByKey.create());
groupedResult.apply(ParDo.of(
new DoFn<KV<String, CoGbkResult>, Long>() {
@ProcessElement
public void processElement(ProcessContext c) {
KV<String, CoGbkResult> e = c.element();
Optional<Long> ordersSum =
StreamSupport
.stream(e.getValue().getAll(t1).spliterator(), true)
.map((line1) -> line1.getAmount())
.reduce((line1, line2) -> line1 + line2);
Optional<Long> refundsSum =
StreamSupport
.stream(e.getValue().getAll(t2).spliterator(), true)
.map((line1) -> line1.getAmount())
.reduce((line1, line2) -> line1 + line2);
Long total = refundsSum.orElse(0L) + ordersSum.orElse(0L);
c.output(total);
}
}))
.apply(Combine.globally(Sum.ofLongs()))
Ok, yikes! What is going on there? That sure seems like a lot of code for a simple problem.
Why does the map want me to declare a type? This is a strongly type language, can't you figure
that out yourself? Oh yeah type erasure, thanks Java. What's is with all these apply()
's, what
happened to my map()?
I wonder if we can do better what does that look like in beam sql?
WITH refund_agg
AS (SELECT rr.original_order_id AS original_order_id,
Sum(rr.refund_amt) AS refund_amt
FROM refunds rr
GROUP BY rr.original_order_id)
SELECT oo.customer_id,
Sum(COALESCE(rr.refund_amt, 0) + oo.order_amt)
FROM orders oo
LEFT JOIN refund_agg rr
ON oo.order_id = rr.original_order_id
WHERE oo.customer_id = 'a'
GROUP BY oo.customer_id
Ok that look like a lot less code. But I wonder how an IDE can help me when I write that, how do I get feedback if my syntax is wrong ?
I've heard Scala is a major language in big data in fact Kafka, Flink, and Spark are all written in Scala.
Dang, and Scala was release in 2004?! Wow, it is a fusion of objected oriented and functional approaches
and supports asynchronous programming as well? That's like node.js, Java, and haskell all rolled into one,
wow I should check that out.
Yeah, you should, in fact a little company called Spotify thinks Scala is pretty cool, and uses Dataflow and decided they could improve the Apache Beam API, with a little library called SCIO. What does our little experiment look like in that:
package example.scala
import com.spotify.scio._
import com.spotify.scio.extra.json._
case class Orders(orders: List[Order])
case class Order(order_id:String, customer_id:String, order_amt:Long)
case class Refunds(refunds: List[Refund])
case class Refund(refund_order_id:String, original_order_id:String, customer_id:String,
refund_amt:Long)
object OrderSum {
def main(cmdlineArgs: Array[String]): Unit = {
val (sc, args) = ContextAndArgs(Array())
val customerId = "a"
val orders = sc.jsonFile[Orders]("src/test/resources/data.json")
.flatMap(_.orders)
.filter(_.customer_id.equals(customerId))
.keyBy(_.order_id)
val refunds = sc.jsonFile[Refunds]("src/test/resources/data.json")
.flatMap(_.refunds)
.groupBy(_.original_order_id)
.map { case (key, refunds) =>
(key, refunds.foldLeft(0L)(_ + _.refund_amt))
}
orders.leftOuterJoin(refunds)
.map { case (_,(order, refundOpt)) =>
refundOpt.fold(order.order_amt)(_ + order.order_amt)
}
.reduce(_ + _)
.map { x =>
println(x.toString)
x.toString
}
val result = sc.close().waitUntilFinish()
}
}
Yep, that's it. That even did the parsing, which wasn't part of the earlier examples, just cause well, it seemed unfair to make Java look so bad.
Ok, that's cool but I'm like really into whatever the latest thing is and I heard Google is going into Kotlin pretty hard, and I think google is wicked cool. So Kotlin was started in 2011, and is slowly catching up to Scala, and has a lot of traction right now, so let's look at that:
val pOrders =
p.fromText(path = "src/test/resources/orders.json")
.parDo<String, Order>{
try {
output(gson.fromJson(element, Order::class.java))
} catch (e: IOException) {
throw RuntimeException(e.message, e)
}
}
val pRefunds =
p.fromText(path = "src/test/resources/refunds.json")
.parDo<String, Refund> {
try {
output(gson.fromJson(element, Refund::class.java))
} catch (e: IOException) {
throw RuntimeException(e.message, e)
}
}
val filteredOrders =
pOrders.filter { it.customer_id == "a" }
.keyBy { it.order_id }
val mappedRefunds =
pRefunds.filter { it.customer_id == "a" }
.groupBy { it.original_order_id }
.map {
val first = it.value.first()
val total = it.value.asSequence().fold(0L) { amount, refund -> amount + refund.refund_amt }
KV.of(it.key, RefundRollup(first.original_order_id, first.customer_id, total))
}
p.coGroupByKey(filteredOrders, mappedRefunds) { _, orders, refunds ->
val ordersSum = orders
.map { it.order_amt }
.fold(0L, { line1, line2 -> line1 + line2 })
val refundsSum = refunds
.map { line1 -> line1.amount }
.fold(0L, { line1, line2 -> line1 + line2 })
listOf(refundsSum, ordersSum)
}
.sumLongs()
.parDo<Long, Void>{println(element)}
Not bad, that look's pretty similar to the Scala. Oh look all those nasty .apply()
's are gone. I like that. So did
did you find a library to do that for you?
Nope, I had to make one myself, but found another person who started one, so its definitely very possible.
So what Godaddy could make an open source library simpler to SCIO but for Kotlin and get some props for getting all big data with Kotlin ?
Yes for sure, that is a possibility. But first Godaddy, has to move past Java...
brew install sbt
This project comes with number of preconfigured features, including:
Use sbt-pack
instead of sbt-assembly
to:
- reduce build time
- enable efficient dependency caching
- reduce job submission time
To build package run:
sbt pack
SCIO (scala examle)
sbt "runMain example.scala.OrderSum --runner=DirectRunner"
Beam SQL (JAVA)
sbt "runMain example.java.OrderSumSql --runner=DirectRunner"
Kotlin
sbt "runMain example.kotlin.OrderSum --runner=DirectRunner"
This template comes with an example of a test, to run tests:
sbt test
Find style configuration in scalastyle-config.xml
. To enforce style run:
sbt scalastyle
To experiment with current codebase in Scio REPL simply:
sbt repl/run
This project is based on the scio.g8.