Skip to content

Instantly share code, notes, and snippets.

Last active December 14, 2017 22:35
Show Gist options
  • Save regadas/7371b54721549151f3e1ca31b0dff780 to your computer and use it in GitHub Desktop.
Save regadas/7371b54721549151f3e1ca31b0dff780 to your computer and use it in GitHub Desktop.
* Copyright 2017 Spotify AB.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package com.spotify.scio.bigtable
import java.{lang, util}
import java.util.UUID
import java.util.concurrent.{Callable, ConcurrentHashMap, ConcurrentLinkedQueue, ConcurrentMap}
import{BigtableDataClient, BigtableSession}
import{Cache, CacheStats}
import{FutureCallback, Futures, ListenableFuture}
import com.spotify.scio.bigtable.BigtableDoFn._
import org.apache.beam.sdk.transforms.DoFn
import org.apache.beam.sdk.transforms.DoFn.{FinishBundle, ProcessElement, StartBundle}
import org.apache.beam.sdk.transforms.windowing.BoundedWindow
import org.apache.beam.sdk.values.{KV, ValueInSingleWindow}
import simulacrum._
import scala.collection.JavaConverters._
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future, Promise}
import scala.language.implicitConversions
object BigtableDoFn {
import Resource._
type LookupFn[I, O] = (BigtableDataClient, I) => Future[O]
type Lookup[I, O] = KV[I, Either[Throwable, O]]
type WindowLookup[I, O] = ValueInSingleWindow[Lookup[I, O]]
def apply[I, O](options: BigtableOptions)(lookupFn: LookupFn[I, O]): BigtableDoFn[I, O] = {
val sessionFn = (uuid: UUID) =>
.get(uuid.toString, () => new BigtableSession(options))
val cacheFn = (uuid: UUID) =>
Resource[Cache[I, O]]
.get(uuid.toString, () => new NoOpCache[I, O]())
BigtableDoFn[I, O](sessionFn, cacheFn)(lookupFn)
def apply[I, O](options: BigtableOptions, cache: Cache[I, O])(
lookupFn: LookupFn[I, O]): BigtableDoFn[I, O] = {
val sessionFn = (uuid: UUID) =>
.get(uuid.toString, () => new BigtableSession(options))
val cacheFn = (uuid: UUID) =>
Resource[Cache[I, O]]
.get(uuid.toString, () => cache)
BigtableDoFn[I, O](sessionFn, cacheFn)(lookupFn)
// probably move this to Utils?
implicit class RichListenableFuture[T](lf: ListenableFuture[T]) {
def asScala: Future[T] = {
val p = Promise[T]()
Futures.addCallback(lf, new FutureCallback[T] {
def onFailure(t: Throwable): Unit = p.failure(t)
def onSuccess(result: T): Unit = p.success(result)
case class BigtableDoFn[I, O](sessionFn: UUID => BigtableSession, cacheFn: UUID => Cache[I, O])(
lookupFn: LookupFn[I, O])
extends DoFn[I, Lookup[I, O]] {
private[this] val instanceId = UUID.randomUUID()
@transient private[this] lazy val lookups =
new ConcurrentLinkedQueue[Future[WindowLookup[I, O]]]()
def startBundle(): Unit = {
def processElement(ctx: ProcessContext, window: BoundedWindow): Unit = {
val input: I = ctx.element()
val cache = cacheFn(instanceId)
val session = sessionFn(instanceId)
val lookup = Option(cache.getIfPresent(input))
.getOrElse {
lookupFn(session.getDataClient, input).map { value =>
cache.put(input, value)
.transform { result => { _ =>
val kv = KV.of(input, result.toEither)
ValueInSingleWindow.of(kv, ctx.timestamp(), window, ctx.pane())
def finishBundle(ctx: FinishBundleContext): Unit = {
val finish = Future.sequence(lookups.asScala).map { lookups =>
lookups.foreach { lookup =>
ctx.output(lookup.getValue, lookup.getTimestamp, lookup.getWindow)
Await.result(finish, Duration.Inf)
@typeclass trait Resource[T] {
def get(resourceId: String, builder: () => T): T
object Resource {
private val Caches = new ConcurrentHashMap[String, Cache[_, _]]()
implicit def guavaResource[K, B]: Resource[Cache[K, B]] =
(resourceId, builder) => {
val cache =
Caches.computeIfAbsent(resourceId, _ => builder().asInstanceOf[Cache[_, _]])
cache.asInstanceOf[Cache[K, B]]
private val Sessions = new ConcurrentHashMap[String, BigtableSession]()
implicit def bigtableSession: Resource[BigtableSession] =
(resourceId, builder) => Sessions.computeIfAbsent(resourceId, _ => builder())
class NoOpCache[I, O] extends Cache[I, O] {
override def getAllPresent(keys: lang.Iterable[_]): ImmutableMap[I, O] = ImmutableMap.of()
override def asMap(): ConcurrentMap[I, O] = new ConcurrentHashMap[I, O]()
override def invalidate(key: scala.Any): Unit = Unit
override def put(key: I, value: O): Unit = Unit
override def invalidateAll(keys: lang.Iterable[_]): Unit = Unit
override def invalidateAll(): Unit = Unit
override def size(): Long = 0
override def stats(): CacheStats = new CacheStats(0, 0, 0, 0, 0, 0)
override def cleanUp(): Unit = Unit
override def putAll(m: util.Map[_ <: I, _ <: O]): Unit = Unit
override def get(key: I, loader: Callable[_ <: O]): O =
override def getIfPresent(key: scala.Any): O = null.asInstanceOf[O]
Copy link

Looked at it briefly. Some initial questions:

  • How does BigtableDataClient in BigtableDoFn survive serialization? And how do multiple cloned instances of BigtableDoFn share a single client?
  • Same question for Cache, how do BigtableDoFn cloned instances share it?
  • BigtableDataClient returns Guava ListenableFuture right? Can we avoid the coversion to Scala Future? I'd also like to avoid users having to convert it themselves (more room for mistake).

Copy link

regadas commented Dec 13, 2017

yup yup! you are totally right ... realized a bunch of stuff while running this... 🤦‍♂️

[info] instanceId: 8dda9f37-4249-4c90-8fbf-5f758ff2a72e client:
[info] instanceId: 8dda9f37-4249-4c90-8fbf-5f758ff2a72e client:
[info] instanceId: 8dda9f37-4249-4c90-8fbf-5f758ff2a72e client:
[info] instanceId: 8dda9f37-4249-4c90-8fbf-5f758ff2a72e client:

I'll be doing some changes...

Regarding the ListenableFuture either:

  1. we can add an implicit class that would get loaded when
    import com.spotify.scio.bigtable.BigtableDoFn._ and then user would just do ListenableFuture.asScala
  2. go even more restrictive and completely hide the client.readFlatRows implementation and just require the user to build Request.

what do you think?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment