Skip to content

Instantly share code, notes, and snippets.

@nambrot
Created December 2, 2017 23:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nambrot/feb33858eceeded7e0d41ceef5e25f26 to your computer and use it in GitHub Desktop.
Save nambrot/feb33858eceeded7e0d41ceef5e25f26 to your computer and use it in GitHub Desktop.
final TupleTag<BloodPressureEvent> bpTag = new TupleTag<>();
final TupleTag<HeartRateEvent> hrTag = new TupleTag<>();
PCollection<KV<Integer, CoGbkResult>> joined =
KeyedPCollectionTuple.of(bpTag, bpEvents)
.and(hrTag, hrEvents)
.apply(CoGroupByKey.create());
PCollection<Alert> alerts = joined.apply(FlatMapElements.via((KV<Integer, CoGbkResult> pair) -> {
CoGbkResult result = pair.getValue();
Boolean hasLowSystolic = FluentIterable.from(result.getAll(bpTag)).anyMatch((BloodPressureEvent event) -> event.systolic < 100);
Boolean hasHighHeartRate = FluentIterable.from(result.getAll(hrTag)).anyMatch((HeartRateEvent event) -> event.heartRate > 200);
if (hasLowSystolic && hasHighHeartRate) {
Instant time = FluentIterable.from(result.getAll(bpTag)).first().get().timestamp;
Alert alert = new Alert();
alert.timestamp = time;
alert.userId = pair.getKey();
alert.message = "High alert detected";
return Lists.newArrayList(alert);
}
return Lists.newArrayList();
}).withOutputType(new TypeDescriptor<Alert>() {}));
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment