Skip to content

Instantly share code, notes, and snippets.

@rsumbaly
Last active September 10, 2017 14:52
Show Gist options
  • Save rsumbaly/c4a11326ae4455853f021ac63386f29b to your computer and use it in GitHub Desktop.
Save rsumbaly/c4a11326ae4455853f021ac63386f29b to your computer and use it in GitHub Desktop.
Making it easier to write kstream constructs in scala
import java.util.Comparator
import org.apache.kafka.streams.kstream._
import org.apache.kafka.streams.processor.StreamPartitioner
object KStreamImplicits {
implicit def scalaFunctionToPredicate[K, V](f: (K, V) => Boolean) = new Predicate[K, V] {
override def test(k: K, v: V): Boolean = f(k, v)
}
implicit def scalaFunctionToKeyValueMapper[K, V, VR](f: (K, V) => VR) = new KeyValueMapper[K, V, VR] {
override def apply(k: K, v: V): VR = f(k, v)
}
implicit def scalaFunctionToValueMapper[V, VR](f: (V) => VR) = new ValueMapper[V, VR] {
override def apply(v: V): VR = f(v)
}
implicit def scalaFunctionToStreamPartitioner[K, V](f: (K, V, Int) => Int) = new StreamPartitioner[K, V] {
override def partition(k: K, v: V, p: Int) = f(k, v, p)
}
implicit def scalaFunctionToValueJoiner[V1, V2, VR](f: (V1, V2) => VR) = new ValueJoiner[V1, V2, VR] {
override def apply(v1: V1, v2: V2) = f(v1, v2)
}
implicit def scalaFunctionToInitializer[V1](f: => V1) = new Initializer[V1] {
override def apply() = f
}
implicit def scalaFunctionToAggregator[K, V, VA](f: (K, V, VA) => VA) = new Aggregator[K, V, VA] {
override def apply(k: K, v: V, a: VA) = f(k, v, a)
}
implicit def scalaFunctionToComparator[K](f: (K, K) => Int) = new Comparator[K] {
override def compare(o1: K, o2: K) = f(o1, o2)
}
implicit def scalaFunctionToForeachAction[K, V](f: (K, V) => Unit) = new ForeachAction[K, V] {
override def apply(k: K, v: V) = f(k, v)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment