Skip to content

Instantly share code, notes, and snippets.

@NajeebArif
Created May 16, 2022 19:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save NajeebArif/d3ad69ee3fc2b85fa8592b349a0ed43b to your computer and use it in GitHub Desktop.
Save NajeebArif/d3ad69ee3fc2b85fa8592b349a0ed43b to your computer and use it in GitHub Desktop.
package narif.poc.springkstreampoc;
import lombok.extern.slf4j.Slf4j;
import narif.poc.springkstreampoc.exceptions.InvalidCreditCardException;
import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
import org.springframework.kafka.config.StreamsBuilderFactoryBeanConfigurer;
@Configuration
@Slf4j
public class KafkaStreamsConfig {
@Bean
public StreamsBuilderFactoryBeanConfigurer streamsCustomizer() {
return new StreamsBuilderFactoryBeanConfigurer() {
@Override
public void configure(StreamsBuilderFactoryBean factoryBean) {
factoryBean.setStreamsUncaughtExceptionHandler(getStreamsUncaughtExceptionHandler());
}
@Override
public int getOrder() {
return Integer.MAX_VALUE - 10000;
}
};
}
private StreamsUncaughtExceptionHandler getStreamsUncaughtExceptionHandler() {
return exception -> {
Throwable cause = exception.getCause();
if (cause.getClass().equals(InvalidCreditCardException.class)) {
log.error(cause.getMessage());
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD;
}
return StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_APPLICATION;
};
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment