Skip to content

Instantly share code, notes, and snippets.

@miguno
Last active July 19, 2017 13:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save miguno/ed7db54fc24697561de38c48bc42c827 to your computer and use it in GitHub Desktop.
Save miguno/ed7db54fc24697561de38c48bc42c827 to your computer and use it in GitHub Desktop.
KStream<CustomerId, KeyValue<SpecificRecord, OutboundMessage>> addressedMessages =
builder.<AccountId, AccountEntry>stream(accountEntryStream)
.leftJoin(accountToCustomerIds, (accountEntry, customerIds) -> {
if (isNull(customerIds)) {
return Collections.<KeyValue<CustomerId, AccountEntry>>emptyList();
} else {
return customerIds.getCustomerIds().stream()
.map(customerId -> KeyValue.pair(customerId, accountEntry))
.collect(toList());
}
})
.flatMap((accountId, accountentryByCustomer) -> accountentryByCustomer)
.through(customerIdToAccountEntryStream)
.leftJoin(alertSettings, Pair::with)
.flatMapValues(
(Pair<AccountEntry, CustomerAlertSettings> accountEntryAndSettings) ->
BalanceAlertsGenerator.generateAlerts(
accountEntryAndSettings.getValue0(),
accountEntryAndSettings.getValue1())
);
// Send all Email messages from addressedMessages
addressedMessages
.filter((e, kv) -> kv.key instanceof EmailAddress)
.map((k, v) -> v)
.to(emailMessageStream);
// Send all Sms messages from addressedMessages
addressedMessages
.filter((e, kv) -> kv.key instanceof PhoneNumber)
.map((k, v) -> v)
.to(smsMessageStream);
// Send all Push messages from addressedMessages
// (CustomerId is later resolved to a list of customer's mobile devices)
addressedMessages
.filter((e, kv) -> kv.key instanceof CustomerId)
.map((k, v) -> v)
.to(customerPushMessageStream);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment