Created
November 29, 2019 14:17
-
-
Save solveretur/fc4fdd6c7663dc4d58fe72d48029f9c3 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Setter | |
@Accessors(chain = true) | |
@Slf4j | |
final class MyStreamBuilder { | |
private StreamsBuilder streamsBuilder; | |
private MyService myService; | |
KStream<String, MyDataUpdate> build() { | |
final KStream<String, MyDataUpdate> stream = streamsBuilder.stream(Topics.INPUT); | |
stream | |
.map((key, value) -> { | |
final Optional<MyResult> myResult = myService.score(value); | |
final MyResult score = myResult.orElseThrow(() -> new RuntimeException()); | |
return KeyValue.pair(key, score); | |
}) | |
.to(Topics.OUTPUT); | |
return stream; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Configuration | |
@RequiredArgsConstructor | |
public class MyStreamConfiguration { | |
private final Environment env; | |
private final MyService myService; | |
@Bean | |
protected StreamsConfig defaultStreamsConfig() { | |
final HashMap<Object, Object> properties = new HashMap<>(); | |
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, env.getRequiredProperty("kafka.application-id")); | |
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, env.getRequiredProperty("kafka.bootstrap-servers")); | |
properties.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, env.getRequiredProperty("kafka.schema-registry-url")); | |
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest"); | |
properties.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 10); | |
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); | |
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class); | |
return new StreamsConfig(properties); | |
} | |
@Bean | |
protected StreamsBuilderFactoryBean streamsBuilder() { | |
final StreamsBuilderFactoryBean streamsBuilderFactoryBean = new StreamsBuilderFactoryBean(); | |
streamsBuilderFactoryBean.setStreamsConfig(defaultStreamsConfig()); | |
return streamsBuilderFactoryBean; | |
} | |
@Bean | |
KStream<String, MyDataUpdate> myKStream(final StreamsBuilder builder) { | |
return new MyStreamBuilder() | |
.setStreamsBuilder(builder) | |
.setMyService(myService) | |
.build(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@Slf4j | |
@RequiredArgsConstructor | |
public final class MySuperService implements MyService { | |
@Override | |
public Optional<MyResult> score(final MyDataUpdate myDataUpdate) { | |
return Try | |
.of( | |
() -> { | |
return doSomethingThatReturnsResult(); | |
} | |
) | |
.onFailure(Throwable::printStackTrace) | |
.toOption() | |
.toJavaOptional(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment