Skip to content

Instantly share code, notes, and snippets.

@nambrot
Created December 2, 2017 23:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nambrot/c1b62de8a8bc9529f996ee106677901b to your computer and use it in GitHub Desktop.
Save nambrot/c1b62de8a8bc9529f996ee106677901b to your computer and use it in GitHub Desktop.
PCollection<KV<Integer, Iterable<Alert>>> sessionedAlerts = alerts
.apply(Window.remerge())
.apply(Window.into(Sessions.withGapDuration(Duration.standardHours(1))))
.apply(MapElements.via((Alert event) -> KV.of(event.userId, event)).withOutputType(new TypeDescriptor<KV<Integer, Alert>>() {}))
.apply(GroupByKey.create());
PCollection<Alert> throttledAlerts = sessionedAlerts
.apply(FlatMapElements.via((KV<Integer, Iterable<Alert>> pair) -> {
ArrayList<Alert> acc = new ArrayList<>();
FluentIterable
.from(pair.getValue())
.toSortedList(Comparator.comparing(a -> a.timestamp))
.forEach((Alert alert) -> {
if (acc.isEmpty() || Iterables.getLast(acc).timestamp.plus(Duration.standardMinutes(60)).isBefore(alert.timestamp)) {
acc.add(alert);
}
});
return acc;
}).withOutputType(new TypeDescriptor<Alert>() {}));
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment