Skip to content

Instantly share code, notes, and snippets.

@NajeebArif
Created May 16, 2022 20:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save NajeebArif/abae0164cba85d901f4dc9aea03437e2 to your computer and use it in GitHub Desktop.
Save NajeebArif/abae0164cba85d901f4dc9aea03437e2 to your computer and use it in GitHub Desktop.
package narif.poc.springkstreampoc;
import lombok.extern.slf4j.Slf4j;
import narif.poc.springkstreampoc.model.OrderInputMsg;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.ValueMapper;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import java.util.function.Function;
@SpringBootApplication
@Slf4j
public class SpringKStreamPocApplication {
public static void main(String[] args) {
SpringApplication.run(SpringKStreamPocApplication.class, args);
}
@Bean
public Function<KStream<String, OrderInputMsg>, KStream<String, OrderInputMsg>> orderProcessor(){
return stringOrderInputMsgKStream -> stringOrderInputMsgKStream
.peek((key, value) -> log.info("Order input msg received with key: {} and payload: {}", key, value))
.mapValues(OrderProcessorService::processOrderMsg);
}
@Bean
public Function<KStream<String, String>, KStream<String, String>> upperCaseProcessor(){
return stringStringKStream -> stringStringKStream
.mapValues((ValueMapper<String, String>) String::toUpperCase);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment