Skip to content

Instantly share code, notes, and snippets.

@sarkologist
Last active December 5, 2019 02:49
Show Gist options
  • Save sarkologist/375c03a71ae81e265e93555299991557 to your computer and use it in GitHub Desktop.
Save sarkologist/375c03a71ae81e265e93555299991557 to your computer and use it in GitHub Desktop.
CoGroupByKeyUtil
object CoGroupByKeyUtil {
// a workaround for scala's lack of rank-2 polymorphism
// so that we can allow the called function to decide how to instantiate F[_]
trait Rank2[F[_]] {
def apply[A]: F[A]
}
def eitherOf[K: Coder, A: Coder, B: Coder](
left: PCollection[KV[K, A]],
right: PCollection[KV[K, B]],
window: Rank2[Window]
): PCollection[KV[K, Either[A, B]]] = {
val leftTag = new TupleTag[A]
val rightTag = new TupleTag[B]
val name = "CoGroupByKeyToEither: " + left.getTypeDescriptor + ", " + right.getTypeDescriptor
KeyedPCollectionTuple
.of[K, A](leftTag, left.applyWithNamePrefix(name, window.apply[KV[K, A]]))
.and[B](rightTag, right.applyWithNamePrefix(name, window.apply[KV[K, B]]))
.applyWithNamePrefix(name, CoGroupByKey.create[K])
.applyWithNamePrefix(
name,
ParDo.of(new CoGroupByKeyToEitherDoFn[K, A, B](leftTag, rightTag)))
.setCoder(CoderUtil.beamCoderFor[KV[K, Either[A, B]]])
}
type DoFnT[K, A, B] = DoFn[KV[K, CoGbkResult], KV[K, Either[A, B]]]
class CoGroupByKeyToEitherDoFn[K, A, B](leftTag: TupleTag[A],
rightTag: TupleTag[B])
extends DoFnT[K, A, B] {
@ProcessElement
def processElement(
c: DoFnT[K, A, B]#ProcessContext,
): Unit = {
val key = c.element.getKey
c.element().getValue.getAll(rightTag).asScala.foreach { x =>
c.output(KV.of(key, Right(x)))
}
c.element().getValue.getAll(leftTag).asScala.foreach { x =>
c.output(KV.of(key, Left(x)))
}
}
}
}
implicit class PCollectionCanName[A](pCollection: PCollection[A]) {
def applyWithNamePrefix[PA >: PCollection[A] <: PInput, B <: POutput](
prefix: String,
transform: PTransform[PA, B]): B =
pCollection
.apply(prefix + "/" + transform.getName, transform)
}
implicit class KeyedPCollectionTupleCanName[A](
pCollection: KeyedPCollectionTuple[A]) {
def applyWithNamePrefix[X, B <: POutput](
prefix: String,
transform: PTransform[KeyedPCollectionTuple[A], B]): B =
pCollection
.apply(prefix + "/" + transform.getName, transform)
}
val deltaWindow = new Rank2[Window] {
def apply[A]: Window[A] = Params.DELTA_WINDOW[A]
}
CoGroupByKeyUtil
.eitherOf(snapshotOutput,
metadataByTopic,
deltaWindow)
.apply(
"FmdDQ to TableRow",
ParDo.of(new FmdDQToTableRowTransform(tradeDate map {
td => (nanos: Long) =>
td.of(Instant.ofEpochSecond(0, nanos))
}))
)
class FmdDQToTableRowTransform extends DoFn[KV[String, Either[FmdDQ, MetaData]], TableRow] {
final val METADATA = "metadata"
@StateId(METADATA)
val metadataStateSpec: StateSpec[ValueState[Option[MetaData]]] =
StateSpecs.value(CoderUtil.beamCoderFor[Option[MetaData]])
final val BAG = "bag"
@DoFn.StateId(BAG)
val bagStateSpec: StateSpec[BagState[FmdDQ]] =
StateSpecs.bag(CoderUtil.beamCoderFor)
@ProcessElement
def processElement(
c: DoFn[KV[String, Either[FmdDQ, MetaData]], TableRow]#ProcessContext,
@StateId(BAG) bagState: BagState[FmdDQ],
@StateId(METADATA) metadataState: ValueState[Option[MetaData]]
): Unit = {
c.element.getValue.fold(
dq => {
Option(metadataState.read()).flatten.fold {
ScioMetrics
.counter[WriteBigQuery.type]("waiting_for_metadata")
.inc()
bagState.add(dq)
} {
metadata =>
(bagState.read().asScala ++ Seq(dq)).foreach {
dq => ...
val tr = new TableRow()
.set(...
c.output(tr)
}
}
},
metadata => metadataState.write(Some(metadata))
)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment