Skip to content

Instantly share code, notes, and snippets.

@fcecilia
Created September 13, 2016 08:32
Show Gist options
  • Save fcecilia/ee065196ea5f94802306692c40c1f16c to your computer and use it in GitHub Desktop.
Save fcecilia/ee065196ea5f94802306692c40c1f16c to your computer and use it in GitHub Desktop.
Implicit for KafkaStream
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.kstream.{KeyValueMapper, ValueMapper}
/**
* Implicit conversions that provide us with some syntactic sugar when writing stream transformations.
*/
object KeyValueImplicits {
implicit def valueMapper[V1, V2](f: V1 => V2): ValueMapper[V1, V2] = new ValueMapper[V1, V2] {
override def apply(value: V1): V2 = f(value)
}
implicit def kayValueMapper[K, V, R](f: ((K, V)) => R): KeyValueMapper[K, V, R] = new KeyValueMapper[K, V, R] {
override def apply(key: K, value: V): R = f((key, value))
}
implicit def Tuple2ToKeyValue[K, V](tuple: (K, V)): KeyValue[K, V] = new KeyValue(tuple._1, tuple._2)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment