Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Snippet of TransfersRecordingServiceJoinStreams.java
final KStream<String, Transfer> transfersKStream =
createInputKStream(stringSerde, configValues.getSchemaRegistryUrl(),
builder);
final Serde<AccountBalance> accountBalanceSerde =
createAccountBalanceSerde(configValues.getSchemaRegistryUrl());
final KTable<String,AccountBalance> accountBalancesKTable =
createInputKTable(stringSerde, accountBalanceSerde, builder);
final KStream<String, AccountBalance> newABstream = transfersKStream.join(
accountBalancesKTable, new TransferAndAccountBalanceJoiner());
newABstream.to(ACCOUNT_BALANCES_TOPIC, Produced.with(stringSerde, accountBalanceSerde));
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment