Skip to content

Instantly share code, notes, and snippets.

@edmondop
Created April 7, 2018 14:54
Show Gist options
  • Save edmondop/c3c6cf0a1cf15327392edfba428c0161 to your computer and use it in GitHub Desktop.
Save edmondop/c3c6cf0a1cf15327392edfba428c0161 to your computer and use it in GitHub Desktop.
class SimpleJoin(streamsConfiguration:Map[String,AnyRef]){
private val builder = new KStreamBuilder
private val myStream = builder.stream[Integer,MyClass](Serdes.Integer(),myClassSerde, "mytopic")
private val myTable: KTable[Integer, MyClass] = myStream.groupBy(new KeyValueMapper[Integer,MyClass,Integer] {
override def apply(key : Integer,
value: MyClass): Integer = {
value.id
}
},Serdes.Integer(),myClassSerde).reduce(new PickLastReducer[MyClass])
private val myOtherClassStream :KStream[Integer,MyOtherClass] = builder.stream[Integer,MyOtherClass](Serdes.Integer(),myOtherClassSerde, "myothertopic").selectKey(
new KeyValueMapper[Integer,MyOtherClass,Integer] {
override def apply(key: Integer,
value: MyOtherClass): Integer = {
value.myClassId
}
})
private val joinedStreams = myOtherClassStream.join[MyClass,MyJoin](myTable,new ValueJoiner[MyOtherClass,MyClass,MyJoin] {
override def apply(value1: MyOtherClass,
value2: MyClass): MyJoin = MyJoin(value2,value1)
},Serdes.Integer(),myOtherClassSerde).selectKey[Integer](new KeyValueMapper[Integer,MyJoin,Integer]{
override def apply(key : Integer,
value: MyJoin): Integer = value.myClass.prop1
})
joinedStreams.to(Serdes.Integer(),myJoinClassSerde,"output-topic")
joinedStreams.print(Serdes.Integer(),myJoinClassSerde)
private val promise = Promise[KafkaStreams]()
val future = promise.future
def start() = {
val properties = new Properties()
streamsConfiguration foreach {
case (key,value) => properties.put(key,value)
}
//properties.put("num.stream.threads",java.lang.Integer.valueOf(4))
val kafkaStreams = new KafkaStreams(builder,properties)
kafkaStreams.start()
kafkaStreams.state().isRunning
kafkaStreams.setStateListener(new StateListener {
override def onChange(newState: KafkaStreams.State,
oldState: KafkaStreams.State): Unit = {
logger.info(s"State changed from $oldState to $newState")
if(newState==KafkaStreams.State.RUNNING){
logger.info(s"Topology $kafkaStreams")
promise.trySuccess(kafkaStreams)
}
}
})
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment