Skip to content

Instantly share code, notes, and snippets.

@solveretur
Created November 29, 2019 14:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save solveretur/fc4fdd6c7663dc4d58fe72d48029f9c3 to your computer and use it in GitHub Desktop.
Save solveretur/fc4fdd6c7663dc4d58fe72d48029f9c3 to your computer and use it in GitHub Desktop.
@Setter
@Accessors(chain = true)
@Slf4j
final class MyStreamBuilder {
private StreamsBuilder streamsBuilder;
private MyService myService;
KStream<String, MyDataUpdate> build() {
final KStream<String, MyDataUpdate> stream = streamsBuilder.stream(Topics.INPUT);
stream
.map((key, value) -> {
final Optional<MyResult> myResult = myService.score(value);
final MyResult score = myResult.orElseThrow(() -> new RuntimeException());
return KeyValue.pair(key, score);
})
.to(Topics.OUTPUT);
return stream;
}
}
@Configuration
@RequiredArgsConstructor
public class MyStreamConfiguration {
private final Environment env;
private final MyService myService;
@Bean
protected StreamsConfig defaultStreamsConfig() {
final HashMap<Object, Object> properties = new HashMap<>();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, env.getRequiredProperty("kafka.application-id"));
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, env.getRequiredProperty("kafka.bootstrap-servers"));
properties.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, env.getRequiredProperty("kafka.schema-registry-url"));
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 10);
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
return new StreamsConfig(properties);
}
@Bean
protected StreamsBuilderFactoryBean streamsBuilder() {
final StreamsBuilderFactoryBean streamsBuilderFactoryBean = new StreamsBuilderFactoryBean();
streamsBuilderFactoryBean.setStreamsConfig(defaultStreamsConfig());
return streamsBuilderFactoryBean;
}
@Bean
KStream<String, MyDataUpdate> myKStream(final StreamsBuilder builder) {
return new MyStreamBuilder()
.setStreamsBuilder(builder)
.setMyService(myService)
.build();
}
}
@Slf4j
@RequiredArgsConstructor
public final class MySuperService implements MyService {
@Override
public Optional<MyResult> score(final MyDataUpdate myDataUpdate) {
return Try
.of(
() -> {
return doSomethingThatReturnsResult();
}
)
.onFailure(Throwable::printStackTrace)
.toOption()
.toJavaOptional();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment