Skip to content

Instantly share code, notes, and snippets.

View NajeebArif's full-sized avatar
💭
I may be slow to respond.

Najeeb Arif NajeebArif

💭
I may be slow to respond.
View GitHub Profile
@Bean
public Function<KStream<String, OrderInputMsg>, KStream<String, OrderInputMsg>> orderProcessWithSideEffect(){
return stringOrderInputMsgKStream -> {
KStream<String, String> stringStringKStream = stringOrderInputMsgKStream
.mapValues((readOnlyKey, value) -> value.getOrderId());
stringStringKStream.to("out-textMsg-topic-0", Produced.with(Serdes.String(), Serdes.String()));
return stringOrderInputMsgKStream
.mapValues((readOnlyKey, value) -> OrderProcessorService.processOrderMsg(value));
};
}
private KeyValue<String, Tuple<Throwable, OrderInputMsg>> getTransformedMessage(String key, OrderInputMsg value) {
try{
final OrderInputMsg msg = OrderProcessorService.processOrderMsg(value);
final Tuple<Throwable, OrderInputMsg> inputMsgTuple = new Tuple<>(null, msg);
return new KeyValue<>(key, inputMsgTuple);
}catch (Exception e){
final Tuple<Throwable, OrderInputMsg> inputMsgTuple = new Tuple<>(e, value);
return new KeyValue<>(key, inputMsgTuple);
}
}
spring:
cloud:
stream:
bindings:
orderBranchingProcessor-in-0:
destination: raw-order-topic
orderBranchingProcessor-out-0:
destination: order-topic
orderBranchingProcessor-out-1:
package narif.poc.springkstreampoc;
import lombok.extern.slf4j.Slf4j;
import narif.poc.springkstreampoc.model.OrderInputMsg;
import narif.poc.springkstreampoc.model.Tuple;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Predicate;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
2022-05-17 02:43:20.772 INFO 7705 --- [-StreamThread-1] n.p.s.SpringKStreamPocApplication : Order input msg received with key: 157ff176-88c7-4dde-8d05-d80b4d14ff3c and payload: OrderInputMsg(orderId=6615e92f-fb23-4ebd-8a46-9f89bf8b71cf, itemName=PS5 0, userName=Najeeb, creditCardNumber=1111-2222-3333-5555, orderAmount=1000.0)
2022-05-17 02:43:20.778 INFO 7705 --- [-StreamThread-1] n.p.s.SpringKStreamPocApplication : ORDER CREDIT CARD INFO MASKED FOR KEY: 157ff176-88c7-4dde-8d05-d80b4d14ff3c, VALUE:OrderInputMsg(orderId=6615e92f-fb23-4ebd-8a46-9f89bf8b71cf, itemName=PS5 0, userName=Najeeb, creditCardNumber=XXXX-XXXX-XXXX-5555, orderAmount=1000.0)
2022-05-17 02:43:20.791 INFO 7705 --- [read-1-producer] org.apache.kafka.clients.Metadata : [Producer clientId=OrderProcessorStream-8b163842-a326-4f42-acc3-32301b6f65b8-StreamThread-1-producer] Resetting the last seen epoch of partition sane-order-topic-0 to 0 since the associated topicId changed from null to sxmI1cSKSLOz01BUdQ68nw
2022-05-17 0
package narif.poc.springkstreampoc;
import lombok.extern.slf4j.Slf4j;
import narif.poc.springkstreampoc.model.OrderInputMsg;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Transformer;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.springframework.boot.SpringApplication;
spring:
cloud:
stream:
bindings:
orderProcessor-in-0:
destination: raw-order-topic
orderProcessor-out-0:
destination: sane-order-topic
upperCaseProcessor-in-0:
package narif.poc.springkstreampoc;
import lombok.extern.slf4j.Slf4j;
import narif.poc.springkstreampoc.model.OrderInputMsg;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
package narif.poc.springkstreampoc;
import narif.poc.springkstreampoc.model.OrderInputMsg;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.jupiter.api.Test;
import java.util.Properties;
import java.util.UUID;
package narif.poc.springkstreampoc;
import lombok.extern.slf4j.Slf4j;
import narif.poc.springkstreampoc.exceptions.InvalidCreditCardException;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
import org.springframework.kafka.config.StreamsBuilderFactoryBeanConfigurer;