Skip to content

Instantly share code, notes, and snippets.

@ddebrunner
Created March 5, 2019 21:17
Show Gist options
  • Save ddebrunner/5d4ef21c255c1d40a4517a0060ff8b99 to your computer and use it in GitHub Desktop.
Save ddebrunner/5d4ef21c255c1d40a4517a0060ff8b99 to your computer and use it in GitHub Desktop.
Cascading windows in Apache Beam.
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Random;
import java.util.function.Supplier;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.windowing.AfterPane;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Trigger;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.json.simple.parser.ParseException;
public class CascadeWindows {
private static class Noop<T> extends DoFn<T, T> {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(c.element());
}
}
public static void main(String args[]) throws IOException, ParseException {
// Create options
PipelineOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation().create();
// Run the consumer
options.setJobName("Readings");
Pipeline pipeline = Pipeline.create(options);
registerCoders(pipeline);
PCollection<Reading> rawReadings = pipeline
.apply(Create.timestamped(new SimulateReadings()));
rawReadings.apply("SO", ParDo.of(new PrintDoFn<>("SOURCE")));
PCollection<KV<Integer, Reading>> readings = rawReadings
.apply(WithKeys.<Integer, Reading>of(t -> t.getId())
.withKeyType(TypeDescriptor.of(Integer.class)));
/*
* W1: Aggregate by key (id) with early firings in the window.
*
* Sample code that just aggregates Reading elements into
* a collected list. (Real app would perform aggregation to reduce data size).
*/
Duration period = Duration.standardMinutes(1);
Trigger w1Trigger = AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterPane.elementCountAtLeast(5));
readings = readings.apply("W1",
Window.<KV<Integer, Reading>>into(FixedWindows.of(period))
.withAllowedLateness(Duration.standardSeconds(30))
.triggering(w1Trigger).discardingFiredPanes());
PCollection<KV<Integer, Iterable<Reading>>> groupedReadings = readings
.apply(GroupByKey.<Integer, Reading>create());
PCollection<Collected<Reading>> collectedReadings = groupedReadings
.apply(ParDo.of(
new DoFn<KV<Integer, Iterable<Reading>>, Collected<Reading>>() {
@ProcessElement
public void processElement(ProcessContext context) {
Collected<Reading> readings = new Collected<Reading>(
context.element().getKey());
for (Reading reading : context.element()
.getValue()) {
readings.getItems().add(reading);
}
if (!readings.getItems().isEmpty())
context.output(readings);
}
}));
collectedReadings.apply("CO", ParDo.of(new PrintDoFn<>("W1OUT")));
// All ok up to here.
// EXAMPLE output for W1OUT:
/*
* W1OUT:C0:[R0:71, R0:11, R0:60, R0:45, R0:40] W1OUT:C3:[R3:2, R3:53,
* R3:20, R3:58, R3:32] W1OUT:C0:[R0:80, R0:72] W1OUT:C1:[R1:10, R1:85,
* R1:26, R1:20] W1OUT:C2:[R2:64, R2:70] W1OUT:C4:[R4:43, R4:76]
*/
/*
* W2: Now aggregate across all elements in a window of the same size
* so a single result is produced across all keys.
*
* Sample app just collects the aggregated collected lists into a single list.
* Real app would do processing across the elements and keys.
*/
/*
* Expected output when using pastEndOfWindowTrigger
*
* W2OUT:A:[R0:71, R0:11, R0:60, R0:45, R0:40, R3:2, R3:53, R3:20,
* R3:58, R3:32, R0:80, R0:72, R1:10, R1:85, R1:26, R1:20, R2:64, R2:70,
* R4:43, R4:76]
*/
Trigger w2Trigger = AfterWatermark.pastEndOfWindow();
collectedReadings = collectedReadings.apply("W2",
Window.<Collected<Reading>>into(FixedWindows.of(period))
.triggering(w2Trigger));
/*
* UNEXPECTED output when using the default trigger for W2.
*
* W2OUT:A:[R0:71, R0:11, R0:60, R0:45, R0:40] W2OUT:A:[R3:2, R3:53,
* R3:20, R3:58, R3:32] W2OUT:A:[R0:80, R0:72] W2OUT:A:[R1:10, R1:85,
* R1:26, R1:20] W2OUT:A:[R2:64, R2:70] W2OUT:A:[R4:43, R4:76]
* W2OUT:A:[]
*/
// collectedReadings = collectedReadings.apply("W2",
// Window.<Collected<Reading>>into(FixedWindows.of(period)));
PCollection<All<Reading>> allReadings = collectedReadings
.apply(Combine.<Collected<Reading>, All<Reading>>globally(
new Combine.CombineFn<Collected<Reading>, All<Reading>, All<Reading>>() {
@Override
public All<Reading> createAccumulator() {
return new All<Reading>();
}
@Override
public All<Reading> addInput(
All<Reading> accumulator,
Collected<Reading> input) {
accumulator.getItems().addAll(input.getItems());
return accumulator;
}
@Override
public All<Reading> mergeAccumulators(
Iterable<All<Reading>> accumulators) {
All<Reading> merge = new All<Reading>();
for (All<Reading> accumulator : accumulators)
merge.getItems()
.addAll(accumulator.getItems());
return merge;
}
@Override
public All<Reading> extractOutput(
All<Reading> accumulator) {
return accumulator;
}
}).withoutDefaults());
allReadings.apply("W2O", ParDo.of(new PrintDoFn<>("W2OUT")));
pipeline.run().waitUntilFinish();
}
/*
* Simple datatypes and simulator below here.
*/
public static void registerCoders(Pipeline pipeline) {
pipeline.getCoderRegistry().registerCoderForClass(Reading.class,
SerializableCoder.<Reading>of(Reading.class));
pipeline.getCoderRegistry().registerCoderForClass(Collected.class,
SerializableCoder.<Collected>of(Collected.class));
pipeline.getCoderRegistry().registerCoderForClass(All.class,
SerializableCoder.<All>of(All.class));
}
public static TimestampedValue<Reading> newReading(Random rand) {
Reading reading = Reading.newReading(rand);
return TimestampedValue.of(reading, new Instant(reading.getTs()));
}
public static class SimulateReadings
implements
Iterable<TimestampedValue<Reading>> {
@Override
public Iterator<TimestampedValue<Reading>> iterator() {
Random rand = new Random(12345678);
Supplier<TimestampedValue<Reading>> supplier = () -> newReading(
rand);
return new Iterator<TimestampedValue<Reading>>() {
int count = 0;
@Override
public boolean hasNext() {
return count < 20;
}
@Override
public TimestampedValue<Reading> next() {
count++;
TimestampedValue<Reading> value = supplier.get();
return value;
}
};
}
}
public static class Reading implements Serializable {
@Override
public int hashCode() {
return Objects.hash(id, ts, value);
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
Reading other = (Reading) obj;
return id == other.id && ts == other.ts && value == other.value;
}
public int getId() {
return id;
}
public int getValue() {
return value;
}
public long getTs() {
return ts;
}
private final int id;
private final int value;
private final long ts;
public Reading(int id, int value, long ts) {
this.id = id;
this.value = value;
this.ts = ts;
}
public String toString() {
return "R" + id + ":" + value;
}
public static Reading newReading(Random rand) {
return new Reading(rand.nextInt(5), rand.nextInt(100),
System.currentTimeMillis());
}
}
public static class Collected<T> implements Serializable {
@Override
public int hashCode() {
return Objects.hash(id, items);
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
Collected other = (Collected) obj;
return id == other.id && Objects.equals(items, other.items);
}
public int getId() {
return id;
}
private final int id;
private final List<T> items = new ArrayList<>();
public List<T> getItems() {
return items;
}
public Collected(int id) {
this.id = id;
}
public String toString() {
return "C" + id + ":" + items;
}
}
public static class All<T> implements Serializable {
@Override
public int hashCode() {
return Objects.hash(items);
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
All other = (All) obj;
return Objects.equals(items, other.items);
}
private final List<T> items = new ArrayList<>();
public String toString() {
return "A:" + items;
}
public List<T> getItems() {
return items;
}
}
/**
* A simple DoFn that prints message to System.out
*/
private static class PrintDoFn<T> extends DoFn<T, Void> {
private final String tag;
public PrintDoFn(String tag) {
this.tag = tag;
}
@ProcessElement
public void processElement(ProcessContext c) {
System.out.println(tag + ":" + c.element());
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment