Skip to content

Instantly share code, notes, and snippets.

@fsarradin
Created April 5, 2022 20:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fsarradin/d0a76321669ff3bffc9fd9934d646bb8 to your computer and use it in GitHub Desktop.
Save fsarradin/d0a76321669ff3bffc9fd9934d646bb8 to your computer and use it in GitHub Desktop.
import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.streams.kstream.{Named, ValueTransformerWithKey}
import org.apache.kafka.streams.processor.ProcessorContext
import org.apache.kafka.streams.scala.StreamsBuilder
import org.apache.kafka.streams.scala.kstream.KStream
import org.apache.kafka.streams.state.{
KeyValueBytesStoreSupplier,
KeyValueStore,
Stores
}
class ExtraStreamBuilder(builder: StreamsBuilder) {
def join[K, A, B, R](left: KStream[K, A], right: KStream[K, B], name: String)(
joiner: (K, Option[A], Option[B]) => Option[R]
)(
keySerde: Serde[K],
joinSerde: Serde[Join[A, B]]
): KStream[K, R] = {
val storeName = s"$name-join-table"
val joinStore: KeyValueBytesStoreSupplier =
Stores.persistentKeyValueStore(storeName)
val joinStoreBuilder =
Stores.keyValueStoreBuilder(joinStore, keySerde, joinSerde)
builder.addStateStore(joinStoreBuilder)
val leftStream: KStream[K, Join[A, B]] =
left.mapValues(
a => Join(Option(a), None),
Named.as(s"$name-left-mapto-join")
)
val rightStream: KStream[K, Join[A, B]] =
right.mapValues(
b => Join(None, Option(b)),
Named.as(s"$name-right-mapto-join")
)
val joinStream: KStream[K, Join[A, B]] =
leftStream.merge(rightStream, Named.as(s"$name-merge"))
val result: KStream[K, R] =
joinStream
.flatTransformValues(
() => joinTransformer[K, A, B, R](joiner, storeName),
Named.as(s"$name-join-streams"),
storeName
)
result
}
private def joinTransformer[K, A, B, R](
joiner: (K, Option[A], Option[B]) => Option[R],
storeName: String
): ValueTransformerWithKey[K, Join[A, B], Iterable[R]] =
new ValueTransformerWithKey[K, Join[A, B], Iterable[R]] {
var joinStore: KeyValueStore[K, Join[A, B]] = _
override def init(context: ProcessorContext): Unit = {
joinStore = context
.getStateStore[KeyValueStore[K, Join[A, B]]](
storeName
)
}
override def transform(key: K, value: Join[A, B]): Iterable[R] = {
val storedValue = Option(joinStore.get(key))
val result: Join[A, B] =
storedValue
.map { sv =>
value match {
case Join(None, Some(_)) =>
val newValue = sv.copy(right = value.right)
joinStore.put(key, newValue)
newValue
case Join(Some(_), None) =>
val newValue = sv.copy(left = value.left)
joinStore.put(key, newValue)
newValue
// Those two cases might not happen
case Join(Some(_), Some(_)) =>
joinStore.put(key, value)
value
case Join(None, None) =>
sv
}
}
.getOrElse {
joinStore.put(key, value)
value
}
result(key, joiner).toSeq
}
override def close(): Unit = ()
}
def emitOnChange[K, V](stream: KStream[K, V], name: String)(
keySerde: Serde[K],
valueSerde: Serde[V]
): KStream[K, V] = {
val storeName = s"$name-emitonchange-table"
val storeSupplier = Stores.persistentKeyValueStore(storeName)
val storeBuilder =
Stores.keyValueStoreBuilder(storeSupplier, keySerde, valueSerde)
builder.addStateStore(storeBuilder)
stream.flatTransformValues(
() => emitOnChangeTransformer[K, V](storeName),
Named.as(s"$name-emitonchange-filter"),
storeName
)
}
private def emitOnChangeTransformer[K, V](
storeName: String
): ValueTransformerWithKey[K, V, Iterable[V]] = {
new ValueTransformerWithKey[K, V, Iterable[V]] {
var emitOnChangeStore: KeyValueStore[K, V] = _
override def init(context: ProcessorContext): Unit = {
emitOnChangeStore =
context.getStateStore[KeyValueStore[K, V]](storeName)
}
override def transform(key: K, value: V): Iterable[V] = {
val storedValue = Option(emitOnChangeStore.get(key))
val result: Option[V] =
storedValue match {
case None =>
emitOnChangeStore.put(key, value)
Some(value)
case Some(sv) =>
if (sv == value) None
else {
emitOnChangeStore.put(key, value)
Some(value)
}
}
result.toSeq
}
override def close(): Unit = ()
}
}
}
case class Join[A, B](left: Option[A], right: Option[B]) {
def apply[K, R](
key: K,
joiner: (K, Option[A], Option[B]) => Option[R]
): Option[R] =
joiner(key, left, right)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment