Skip to content

Instantly share code, notes, and snippets.

View NajeebArif's full-sized avatar
💭
I may be slow to respond.

Najeeb Arif NajeebArif

💭
I may be slow to respond.
View GitHub Profile
package narif.poc.springkstreampoc;
import lombok.extern.slf4j.Slf4j;
import narif.poc.springkstreampoc.model.OrderInputMsg;
import narif.poc.springkstreampoc.model.Tuple;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Predicate;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
spring:
cloud:
stream:
bindings:
orderBranchingProcessor-in-0:
destination: raw-order-topic
orderBranchingProcessor-out-0:
destination: order-topic
orderBranchingProcessor-out-1:
private KeyValue<String, Tuple<Throwable, OrderInputMsg>> getTransformedMessage(String key, OrderInputMsg value) {
try{
final OrderInputMsg msg = OrderProcessorService.processOrderMsg(value);
final Tuple<Throwable, OrderInputMsg> inputMsgTuple = new Tuple<>(null, msg);
return new KeyValue<>(key, inputMsgTuple);
}catch (Exception e){
final Tuple<Throwable, OrderInputMsg> inputMsgTuple = new Tuple<>(e, value);
return new KeyValue<>(key, inputMsgTuple);
}
}
@Bean
public Function<KStream<String, OrderInputMsg>, KStream<String, OrderInputMsg>> orderProcessWithSideEffect(){
return stringOrderInputMsgKStream -> {
KStream<String, String> stringStringKStream = stringOrderInputMsgKStream
.mapValues((readOnlyKey, value) -> value.getOrderId());
stringStringKStream.to("out-textMsg-topic-0", Produced.with(Serdes.String(), Serdes.String()));
return stringOrderInputMsgKStream
.mapValues((readOnlyKey, value) -> OrderProcessorService.processOrderMsg(value));
};
}