Skip to content

Instantly share code, notes, and snippets.

@johntbush
Last active August 18, 2019 06:10
Show Gist options
  • Save johntbush/1c0163df8e659a514065ee4be5a093c6 to your computer and use it in GitHub Desktop.
Save johntbush/1c0163df8e659a514065ee4be5a093c6 to your computer and use it in GitHub Desktop.
beam stuff

Apache Beam pipelines

Experiments beyond Java to create pipelines that are semantically more familiar to sql developers, functional programmers, and others with big data backgrounds.

The dream is we can make pipelines in less time and make them easier to read. This will bring value faster and lower our maintenance costs in the long run.

The best way to explain this is with an example. We take a simple made up model of orders and refunds. An order can have 0 to N refunds. A customer can have 0 to N orders. We want to total the amount a customer X has paid across all orders. Order amount total minus refund total.

So what does that look like in Java:

PCollection<KV<String, LineItem>> filteredOrders =
        orders
            .apply(
                MapElements
                    .into(TypeDescriptor.of(LineItem.class))
                    .via((SerializableFunction<Order, LineItem>) order -> new LineItem(order.getOrder_id(),
                        order.getCustomer_id(), order.getOrder_amt())))
            .apply(Filter.by((SerializableFunction<LineItem, Boolean>) c -> c.getCustomerId().equals("a")))
            .apply(WithKeys.of(LineItem::getOrderId).withKeyType(TypeDescriptor.of(String.class)));

    PCollection<KV<String, LineItem>> mappedRefunds =
        refunds
            .apply(MapElements.into(TypeDescriptor.of(LineItem.class))
                .via((SerializableFunction<Refund, LineItem>) refund -> new LineItem(refund.getOriginal_order_id(),
                    refund.getCustomer_id(), refund.getRefund_amt())))
            .apply(WithKeys.of(LineItem::getOrderId).withKeyType(TypeDescriptor.of(String.class)))
            .apply(Combine.perKey((SerializableFunction<Iterable<LineItem>, LineItem>) input -> {
              LineItem first = Iterables.get(input, 0);
              String orderId = first.getOrderId();
              String customerId = first.getCustomerId();
              Long total =
                  StreamSupport
                      .stream(input.spliterator(), false)
                      .map((line1) -> line1.getAmount())
                      .reduce((line1, line2) -> line1 + line2)
                      .orElse(0L);
              return new LineItem(orderId, customerId, total);
            }));

    final TupleTag<LineItem> t1 = new TupleTag<>();
    final TupleTag<LineItem> t2 = new TupleTag<>();
    PCollection<KV<String, CoGbkResult>> groupedResult =
        KeyedPCollectionTuple.of(t1, filteredOrders)
            .and(t2, mappedRefunds)
            .apply(CoGroupByKey.create());

    groupedResult.apply(ParDo.of(
        new DoFn<KV<String, CoGbkResult>, Long>() {
          @ProcessElement
          public void processElement(ProcessContext c) {
            KV<String, CoGbkResult> e = c.element();
            Optional<Long> ordersSum =
                StreamSupport
                    .stream(e.getValue().getAll(t1).spliterator(), true)
                    .map((line1) -> line1.getAmount())
                    .reduce((line1, line2) -> line1 + line2);
            Optional<Long> refundsSum =
                StreamSupport
                    .stream(e.getValue().getAll(t2).spliterator(), true)
                    .map((line1) -> line1.getAmount())
                    .reduce((line1, line2) -> line1 + line2);
            Long total = refundsSum.orElse(0L) + ordersSum.orElse(0L);
            c.output(total);
          }
        }))
        .apply(Combine.globally(Sum.ofLongs()))

Ok, yikes! What is going on there? That sure seems like a lot of code for a simple problem. Why does the map want me to declare a type? This is a strongly type language, can't you figure that out yourself? Oh yeah type erasure, thanks Java. What's is with all these apply()'s, what happened to my map()? I wonder if we can do better what does that look like in beam sql?

WITH refund_agg
     AS (SELECT rr.original_order_id AS original_order_id,
                Sum(rr.refund_amt)   AS refund_amt
         FROM   refunds rr
         GROUP  BY rr.original_order_id)
SELECT oo.customer_id,
       Sum(COALESCE(rr.refund_amt, 0) + oo.order_amt)
FROM   orders oo
       LEFT JOIN refund_agg rr
              ON oo.order_id = rr.original_order_id
WHERE  oo.customer_id = 'a'
GROUP  BY oo.customer_id

Ok that look like a lot less code. But I wonder how an IDE can help me when I write that, how do I get feedback if my syntax is wrong ?

I've heard Scala is a major language in big data in fact Kafka, Flink, and Spark are all written in Scala.
Dang, and Scala was release in 2004?! Wow, it is a fusion of objected oriented and functional approaches and supports asynchronous programming as well? That's like node.js, Java, and haskell all rolled into one, wow I should check that out.

Yeah, you should, in fact a little company called Spotify thinks Scala is pretty cool, and uses Dataflow and decided they could improve the Apache Beam API, with a little library called SCIO. What does our little experiment look like in that:

package example.scala

import com.spotify.scio._
import com.spotify.scio.extra.json._

case class Orders(orders: List[Order])
case class Order(order_id:String, customer_id:String, order_amt:Long)

case class Refunds(refunds: List[Refund])
case class Refund(refund_order_id:String, original_order_id:String, customer_id:String,
                  refund_amt:Long)

object OrderSum {
  def main(cmdlineArgs: Array[String]): Unit = {
    val (sc, args) = ContextAndArgs(Array())
    val customerId = "a"
    val orders = sc.jsonFile[Orders]("src/test/resources/data.json")
      .flatMap(_.orders)
      .filter(_.customer_id.equals(customerId))
      .keyBy(_.order_id)
    val refunds = sc.jsonFile[Refunds]("src/test/resources/data.json")
      .flatMap(_.refunds)
      .groupBy(_.original_order_id)
      .map { case (key, refunds) =>
        (key, refunds.foldLeft(0L)(_ + _.refund_amt))
      }

    orders.leftOuterJoin(refunds)
      .map { case (_,(order, refundOpt)) =>
        refundOpt.fold(order.order_amt)(_ + order.order_amt)
      }
      .reduce(_ + _)
      .map { x =>
        println(x.toString)
        x.toString
      }
    val result = sc.close().waitUntilFinish()
  }
}

Yep, that's it. That even did the parsing, which wasn't part of the earlier examples, just cause well, it seemed unfair to make Java look so bad.

Ok, that's cool but I'm like really into whatever the latest thing is and I heard Google is going into Kotlin pretty hard, and I think google is wicked cool. So Kotlin was started in 2011, and is slowly catching up to Scala, and has a lot of traction right now, so let's look at that:

   val pOrders =
       p.fromText(path = "src/test/resources/orders.json")
           .parDo<String, Order>{
               try {
                 output(gson.fromJson(element, Order::class.java))
               } catch (e: IOException) {
                 throw RuntimeException(e.message, e)
               }
             }

   val pRefunds =
       p.fromText(path = "src/test/resources/refunds.json")
           .parDo<String, Refund> {
             try {
               output(gson.fromJson(element, Refund::class.java))
             } catch (e: IOException) {
               throw RuntimeException(e.message, e)
             }
           }

   val filteredOrders =
       pOrders.filter { it.customer_id == "a" }
           .keyBy { it.order_id }

   val mappedRefunds =
       pRefunds.filter { it.customer_id == "a" }
           .groupBy { it.original_order_id }
           .map {
             val first = it.value.first()
             val total = it.value.asSequence().fold(0L) { amount, refund -> amount + refund.refund_amt }
             KV.of(it.key, RefundRollup(first.original_order_id, first.customer_id, total))
           }

   p.coGroupByKey(filteredOrders, mappedRefunds) { _, orders, refunds ->
         val ordersSum = orders
             .map { it.order_amt }
             .fold(0L, { line1, line2 -> line1 + line2 })
         val refundsSum = refunds
             .map { line1 -> line1.amount }
             .fold(0L, { line1, line2 -> line1 + line2 })
         listOf(refundsSum, ordersSum)
       }
       .sumLongs()
       .parDo<Long, Void>{println(element)}

Not bad, that look's pretty similar to the Scala. Oh look all those nasty .apply()'s are gone. I like that. So did did you find a library to do that for you?

Nope, I had to make one myself, but found another person who started one, so its definitely very possible.

So what Godaddy could make an open source library simpler to SCIO but for Kotlin and get some props for getting all big data with Kotlin ?

Yes for sure, that is a possibility. But first Godaddy, has to move past Java...

Setup Dev Env:

brew install sbt

Features:

This project comes with number of preconfigured features, including:

sbt-pack

Use sbt-pack instead of sbt-assembly to:

  • reduce build time
  • enable efficient dependency caching
  • reduce job submission time

To build package run:

sbt pack

Running Examples

SCIO (scala examle)

sbt "runMain example.scala.OrderSum --runner=DirectRunner"

Beam SQL (JAVA)

sbt "runMain example.java.OrderSumSql --runner=DirectRunner"

Kotlin

sbt "runMain example.kotlin.OrderSum --runner=DirectRunner"

Testing

This template comes with an example of a test, to run tests:

sbt test

Scala style

Find style configuration in scalastyle-config.xml. To enforce style run:

sbt scalastyle

REPL

To experiment with current codebase in Scio REPL simply:

sbt repl/run

This project is based on the scio.g8.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment