Skip to content

Instantly share code, notes, and snippets.

@a-agmon
Last active July 7, 2022 19:21
def getStreamTopology(inputTopic:String):Topology = {
val builder = new StreamsBuilder()
val reqStream = builder.stream[String, PredictRequest](inputTopic)
reqStream
.map( (_, request) => {
Classifier.predict(request.recordID, request.featuresVector)
})
.split()
.branch((key, risk) => risk >= 0.5 ,
Branched.withConsumer(stream => stream.to("suspects-topic")))
.branch((key, risk) => risk < 0.5 ,
Branched.withConsumer(stream => stream.to("regular-topic")))
builder.build()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment