Skip to content

Instantly share code, notes, and snippets.

@sarkologist
Created June 10, 2020 12:28
Show Gist options
  • Save sarkologist/f4e99430c1cf163f53bc775e42592c4e to your computer and use it in GitHub Desktop.
Save sarkologist/f4e99430c1cf163f53bc775e42592c4e to your computer and use it in GitHub Desktop.
apache beam PCollection basic transforms
package gng.box.beam.utils
import com.spotify.scio.coders.{Beam, Coder, CoderMaterializer}
import org.apache.beam.sdk.coders.{CoderRegistry, KvCoder, Coder => BeamCoder}
import org.apache.beam.sdk.options.PipelineOptionsFactory
import org.apache.beam.sdk.values.KV
object CoderUtil {
def beamCoderFor[A: Coder]: BeamCoder[A] =
toBeam(Coder[A])
def toBeam[A](coder: Coder[A]): BeamCoder[A] =
CoderMaterializer.beam(CoderRegistry.createDefault(),
PipelineOptionsFactory.create(),
coder)
def beamKvCoderFor[K: Coder, V: Coder]: BeamCoder[KV[K, V]] =
toBeam(Beam(KvCoder.of(beamCoderFor[K], beamCoderFor[V])))
}
package utils
import java.lang.{Boolean => JBoolean, Iterable => JIterable}
import com.spotify.scio.coders.Coder
import utils.CoderUtil.beamCoderFor
import org.apache.beam.sdk.coders.KvCoder
import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple
import org.apache.beam.sdk.transforms._
import org.apache.beam.sdk.values.{KV, PCollection, PInput, POutput}
import scala.collection.JavaConverters._
object PCollectionPimp {
implicit class PCollectionCanFunction[A](pCollection: PCollection[A]) {
def map[B: Coder](f: A => B): PCollection[B] =
pCollection
.apply(MapElements.via(new InferableFunction[A, B]() {
override def apply(input: A): B = f(input)
}))
.setCoder(beamCoderFor[B])
def mapKv[K: Coder, V, W: Coder](f: V => W)(
implicit ev: A =:= KV[K, V]): PCollection[KV[K, W]] =
pCollection
.map(kv => KV.of(kv.getKey, f(kv.getValue)))
.setCoder(KvCoder.of(beamCoderFor[K], beamCoderFor[W]))
def filter(f: A => Boolean): PCollection[A] =
pCollection
.apply(Filter.by(new InferableFunction[A, JBoolean]() {
override def apply(input: A): JBoolean = f(input)
}))
def flatMap[B: Coder](f: A => Iterable[B]): PCollection[B] =
pCollection
.apply(FlatMapElements.via(new InferableFunction[A, JIterable[B]]() {
override def apply(input: A): JIterable[B] = f(input).asJava
}))
.setCoder(beamCoderFor[B])
}
implicit class PCollectionCanWait[A: Coder](pCollection: PCollection[A]) {
def waitOn[B](waitedOn: PCollection[B]*): PCollection[A] =
pCollection
.applyWithNamePrefix(pCollection.getName,
Wait.on(waitedOn: _*): Wait.OnSignal[A])
.setCoder(beamCoderFor[A])
}
implicit class PCollectionCanName[A](pCollection: PCollection[A]) {
def applyWithNamePrefix[PA >: PCollection[A] <: PInput, B <: POutput](
prefix: String,
transform: PTransform[PA, B]): B =
pCollection
.apply(prefix + "/" + transform.getName, transform)
}
implicit class KeyedPCollectionTupleCanName[A](
pCollection: KeyedPCollectionTuple[A]) {
def applyWithNamePrefix[X, B <: POutput](
prefix: String,
transform: PTransform[KeyedPCollectionTuple[A], B]): B =
pCollection
.apply(prefix + "/" + transform.getName, transform)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment