Skip to content

Instantly share code, notes, and snippets.

@nambrot
Created December 2, 2017 23:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nambrot/d3e1252de398b1c3a48b66841d190a46 to your computer and use it in GitHub Desktop.
Save nambrot/d3e1252de398b1c3a48b66841d190a46 to your computer and use it in GitHub Desktop.
SlidingWindows window = SlidingWindows.of(Duration.standardMinutes(60)).every(Duration.standardMinutes(5));
// create the sources and map them into POJOs (and window them by the sliding window)
PCollection<KV<Integer, BloodPressureEvent>> bpEvents =
p.apply(Create.of(bloodPressureRawEvents))
.apply(ParDo.of(new BloodPressureEventCaster()))
.apply(Window.into(window))
.apply(MapElements.via((BloodPressureEvent event) -> KV.of(event.userId, event)).withOutputType(new TypeDescriptor<KV<Integer, BloodPressureEvent>>() {}));
PCollection<KV<Integer, HeartRateEvent>> hrEvents =
p.apply(Create.of(heartRateRawEvents))
.apply(ParDo.of(new HeartRateEventCaster()))
.apply(Window.into(window))
.apply(MapElements.via((HeartRateEvent event) -> KV.of(event.userId, event)).withOutputType(new TypeDescriptor<KV<Integer, HeartRateEvent>>() {}));
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment