Last active
December 5, 2019 02:49
-
-
Save sarkologist/375c03a71ae81e265e93555299991557 to your computer and use it in GitHub Desktop.
CoGroupByKeyUtil
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
object CoGroupByKeyUtil { | |
// a workaround for scala's lack of rank-2 polymorphism | |
// so that we can allow the called function to decide how to instantiate F[_] | |
trait Rank2[F[_]] { | |
def apply[A]: F[A] | |
} | |
def eitherOf[K: Coder, A: Coder, B: Coder]( | |
left: PCollection[KV[K, A]], | |
right: PCollection[KV[K, B]], | |
window: Rank2[Window] | |
): PCollection[KV[K, Either[A, B]]] = { | |
val leftTag = new TupleTag[A] | |
val rightTag = new TupleTag[B] | |
val name = "CoGroupByKeyToEither: " + left.getTypeDescriptor + ", " + right.getTypeDescriptor | |
KeyedPCollectionTuple | |
.of[K, A](leftTag, left.applyWithNamePrefix(name, window.apply[KV[K, A]])) | |
.and[B](rightTag, right.applyWithNamePrefix(name, window.apply[KV[K, B]])) | |
.applyWithNamePrefix(name, CoGroupByKey.create[K]) | |
.applyWithNamePrefix( | |
name, | |
ParDo.of(new CoGroupByKeyToEitherDoFn[K, A, B](leftTag, rightTag))) | |
.setCoder(CoderUtil.beamCoderFor[KV[K, Either[A, B]]]) | |
} | |
type DoFnT[K, A, B] = DoFn[KV[K, CoGbkResult], KV[K, Either[A, B]]] | |
class CoGroupByKeyToEitherDoFn[K, A, B](leftTag: TupleTag[A], | |
rightTag: TupleTag[B]) | |
extends DoFnT[K, A, B] { | |
@ProcessElement | |
def processElement( | |
c: DoFnT[K, A, B]#ProcessContext, | |
): Unit = { | |
val key = c.element.getKey | |
c.element().getValue.getAll(rightTag).asScala.foreach { x => | |
c.output(KV.of(key, Right(x))) | |
} | |
c.element().getValue.getAll(leftTag).asScala.foreach { x => | |
c.output(KV.of(key, Left(x))) | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
implicit class PCollectionCanName[A](pCollection: PCollection[A]) { | |
def applyWithNamePrefix[PA >: PCollection[A] <: PInput, B <: POutput]( | |
prefix: String, | |
transform: PTransform[PA, B]): B = | |
pCollection | |
.apply(prefix + "/" + transform.getName, transform) | |
} | |
implicit class KeyedPCollectionTupleCanName[A]( | |
pCollection: KeyedPCollectionTuple[A]) { | |
def applyWithNamePrefix[X, B <: POutput]( | |
prefix: String, | |
transform: PTransform[KeyedPCollectionTuple[A], B]): B = | |
pCollection | |
.apply(prefix + "/" + transform.getName, transform) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val deltaWindow = new Rank2[Window] { | |
def apply[A]: Window[A] = Params.DELTA_WINDOW[A] | |
} | |
CoGroupByKeyUtil | |
.eitherOf(snapshotOutput, | |
metadataByTopic, | |
deltaWindow) | |
.apply( | |
"FmdDQ to TableRow", | |
ParDo.of(new FmdDQToTableRowTransform(tradeDate map { | |
td => (nanos: Long) => | |
td.of(Instant.ofEpochSecond(0, nanos)) | |
})) | |
) | |
class FmdDQToTableRowTransform extends DoFn[KV[String, Either[FmdDQ, MetaData]], TableRow] { | |
final val METADATA = "metadata" | |
@StateId(METADATA) | |
val metadataStateSpec: StateSpec[ValueState[Option[MetaData]]] = | |
StateSpecs.value(CoderUtil.beamCoderFor[Option[MetaData]]) | |
final val BAG = "bag" | |
@DoFn.StateId(BAG) | |
val bagStateSpec: StateSpec[BagState[FmdDQ]] = | |
StateSpecs.bag(CoderUtil.beamCoderFor) | |
@ProcessElement | |
def processElement( | |
c: DoFn[KV[String, Either[FmdDQ, MetaData]], TableRow]#ProcessContext, | |
@StateId(BAG) bagState: BagState[FmdDQ], | |
@StateId(METADATA) metadataState: ValueState[Option[MetaData]] | |
): Unit = { | |
c.element.getValue.fold( | |
dq => { | |
Option(metadataState.read()).flatten.fold { | |
ScioMetrics | |
.counter[WriteBigQuery.type]("waiting_for_metadata") | |
.inc() | |
bagState.add(dq) | |
} { | |
metadata => | |
(bagState.read().asScala ++ Seq(dq)).foreach { | |
dq => ... | |
val tr = new TableRow() | |
.set(... | |
c.output(tr) | |
} | |
} | |
}, | |
metadata => metadataState.write(Some(metadata)) | |
) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment