Created
April 7, 2018 14:54
-
-
Save edmondop/c3c6cf0a1cf15327392edfba428c0161 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class SimpleJoin(streamsConfiguration:Map[String,AnyRef]){ | |
private val builder = new KStreamBuilder | |
private val myStream = builder.stream[Integer,MyClass](Serdes.Integer(),myClassSerde, "mytopic") | |
private val myTable: KTable[Integer, MyClass] = myStream.groupBy(new KeyValueMapper[Integer,MyClass,Integer] { | |
override def apply(key : Integer, | |
value: MyClass): Integer = { | |
value.id | |
} | |
},Serdes.Integer(),myClassSerde).reduce(new PickLastReducer[MyClass]) | |
private val myOtherClassStream :KStream[Integer,MyOtherClass] = builder.stream[Integer,MyOtherClass](Serdes.Integer(),myOtherClassSerde, "myothertopic").selectKey( | |
new KeyValueMapper[Integer,MyOtherClass,Integer] { | |
override def apply(key: Integer, | |
value: MyOtherClass): Integer = { | |
value.myClassId | |
} | |
}) | |
private val joinedStreams = myOtherClassStream.join[MyClass,MyJoin](myTable,new ValueJoiner[MyOtherClass,MyClass,MyJoin] { | |
override def apply(value1: MyOtherClass, | |
value2: MyClass): MyJoin = MyJoin(value2,value1) | |
},Serdes.Integer(),myOtherClassSerde).selectKey[Integer](new KeyValueMapper[Integer,MyJoin,Integer]{ | |
override def apply(key : Integer, | |
value: MyJoin): Integer = value.myClass.prop1 | |
}) | |
joinedStreams.to(Serdes.Integer(),myJoinClassSerde,"output-topic") | |
joinedStreams.print(Serdes.Integer(),myJoinClassSerde) | |
private val promise = Promise[KafkaStreams]() | |
val future = promise.future | |
def start() = { | |
val properties = new Properties() | |
streamsConfiguration foreach { | |
case (key,value) => properties.put(key,value) | |
} | |
//properties.put("num.stream.threads",java.lang.Integer.valueOf(4)) | |
val kafkaStreams = new KafkaStreams(builder,properties) | |
kafkaStreams.start() | |
kafkaStreams.state().isRunning | |
kafkaStreams.setStateListener(new StateListener { | |
override def onChange(newState: KafkaStreams.State, | |
oldState: KafkaStreams.State): Unit = { | |
logger.info(s"State changed from $oldState to $newState") | |
if(newState==KafkaStreams.State.RUNNING){ | |
logger.info(s"Topology $kafkaStreams") | |
promise.trySuccess(kafkaStreams) | |
} | |
} | |
}) | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment