Created
March 5, 2019 21:17
-
-
Save ddebrunner/5d4ef21c255c1d40a4517a0060ff8b99 to your computer and use it in GitHub Desktop.
Cascading windows in Apache Beam.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.IOException; | |
import java.io.Serializable; | |
import java.util.ArrayList; | |
import java.util.Iterator; | |
import java.util.List; | |
import java.util.Objects; | |
import java.util.Random; | |
import java.util.function.Supplier; | |
import org.apache.beam.sdk.Pipeline; | |
import org.apache.beam.sdk.coders.SerializableCoder; | |
import org.apache.beam.sdk.options.PipelineOptions; | |
import org.apache.beam.sdk.options.PipelineOptionsFactory; | |
import org.apache.beam.sdk.transforms.Combine; | |
import org.apache.beam.sdk.transforms.Create; | |
import org.apache.beam.sdk.transforms.DoFn; | |
import org.apache.beam.sdk.transforms.GroupByKey; | |
import org.apache.beam.sdk.transforms.ParDo; | |
import org.apache.beam.sdk.transforms.WithKeys; | |
import org.apache.beam.sdk.transforms.windowing.AfterPane; | |
import org.apache.beam.sdk.transforms.windowing.AfterWatermark; | |
import org.apache.beam.sdk.transforms.windowing.FixedWindows; | |
import org.apache.beam.sdk.transforms.windowing.Trigger; | |
import org.apache.beam.sdk.transforms.windowing.Window; | |
import org.apache.beam.sdk.values.KV; | |
import org.apache.beam.sdk.values.PCollection; | |
import org.apache.beam.sdk.values.TimestampedValue; | |
import org.apache.beam.sdk.values.TypeDescriptor; | |
import org.joda.time.Duration; | |
import org.joda.time.Instant; | |
import org.json.simple.parser.ParseException; | |
public class CascadeWindows { | |
private static class Noop<T> extends DoFn<T, T> { | |
@ProcessElement | |
public void processElement(ProcessContext c) { | |
c.output(c.element()); | |
} | |
} | |
public static void main(String args[]) throws IOException, ParseException { | |
// Create options | |
PipelineOptions options = PipelineOptionsFactory.fromArgs(args) | |
.withValidation().create(); | |
// Run the consumer | |
options.setJobName("Readings"); | |
Pipeline pipeline = Pipeline.create(options); | |
registerCoders(pipeline); | |
PCollection<Reading> rawReadings = pipeline | |
.apply(Create.timestamped(new SimulateReadings())); | |
rawReadings.apply("SO", ParDo.of(new PrintDoFn<>("SOURCE"))); | |
PCollection<KV<Integer, Reading>> readings = rawReadings | |
.apply(WithKeys.<Integer, Reading>of(t -> t.getId()) | |
.withKeyType(TypeDescriptor.of(Integer.class))); | |
/* | |
* W1: Aggregate by key (id) with early firings in the window. | |
* | |
* Sample code that just aggregates Reading elements into | |
* a collected list. (Real app would perform aggregation to reduce data size). | |
*/ | |
Duration period = Duration.standardMinutes(1); | |
Trigger w1Trigger = AfterWatermark.pastEndOfWindow() | |
.withEarlyFirings(AfterPane.elementCountAtLeast(5)); | |
readings = readings.apply("W1", | |
Window.<KV<Integer, Reading>>into(FixedWindows.of(period)) | |
.withAllowedLateness(Duration.standardSeconds(30)) | |
.triggering(w1Trigger).discardingFiredPanes()); | |
PCollection<KV<Integer, Iterable<Reading>>> groupedReadings = readings | |
.apply(GroupByKey.<Integer, Reading>create()); | |
PCollection<Collected<Reading>> collectedReadings = groupedReadings | |
.apply(ParDo.of( | |
new DoFn<KV<Integer, Iterable<Reading>>, Collected<Reading>>() { | |
@ProcessElement | |
public void processElement(ProcessContext context) { | |
Collected<Reading> readings = new Collected<Reading>( | |
context.element().getKey()); | |
for (Reading reading : context.element() | |
.getValue()) { | |
readings.getItems().add(reading); | |
} | |
if (!readings.getItems().isEmpty()) | |
context.output(readings); | |
} | |
})); | |
collectedReadings.apply("CO", ParDo.of(new PrintDoFn<>("W1OUT"))); | |
// All ok up to here. | |
// EXAMPLE output for W1OUT: | |
/* | |
* W1OUT:C0:[R0:71, R0:11, R0:60, R0:45, R0:40] W1OUT:C3:[R3:2, R3:53, | |
* R3:20, R3:58, R3:32] W1OUT:C0:[R0:80, R0:72] W1OUT:C1:[R1:10, R1:85, | |
* R1:26, R1:20] W1OUT:C2:[R2:64, R2:70] W1OUT:C4:[R4:43, R4:76] | |
*/ | |
/* | |
* W2: Now aggregate across all elements in a window of the same size | |
* so a single result is produced across all keys. | |
* | |
* Sample app just collects the aggregated collected lists into a single list. | |
* Real app would do processing across the elements and keys. | |
*/ | |
/* | |
* Expected output when using pastEndOfWindowTrigger | |
* | |
* W2OUT:A:[R0:71, R0:11, R0:60, R0:45, R0:40, R3:2, R3:53, R3:20, | |
* R3:58, R3:32, R0:80, R0:72, R1:10, R1:85, R1:26, R1:20, R2:64, R2:70, | |
* R4:43, R4:76] | |
*/ | |
Trigger w2Trigger = AfterWatermark.pastEndOfWindow(); | |
collectedReadings = collectedReadings.apply("W2", | |
Window.<Collected<Reading>>into(FixedWindows.of(period)) | |
.triggering(w2Trigger)); | |
/* | |
* UNEXPECTED output when using the default trigger for W2. | |
* | |
* W2OUT:A:[R0:71, R0:11, R0:60, R0:45, R0:40] W2OUT:A:[R3:2, R3:53, | |
* R3:20, R3:58, R3:32] W2OUT:A:[R0:80, R0:72] W2OUT:A:[R1:10, R1:85, | |
* R1:26, R1:20] W2OUT:A:[R2:64, R2:70] W2OUT:A:[R4:43, R4:76] | |
* W2OUT:A:[] | |
*/ | |
// collectedReadings = collectedReadings.apply("W2", | |
// Window.<Collected<Reading>>into(FixedWindows.of(period))); | |
PCollection<All<Reading>> allReadings = collectedReadings | |
.apply(Combine.<Collected<Reading>, All<Reading>>globally( | |
new Combine.CombineFn<Collected<Reading>, All<Reading>, All<Reading>>() { | |
@Override | |
public All<Reading> createAccumulator() { | |
return new All<Reading>(); | |
} | |
@Override | |
public All<Reading> addInput( | |
All<Reading> accumulator, | |
Collected<Reading> input) { | |
accumulator.getItems().addAll(input.getItems()); | |
return accumulator; | |
} | |
@Override | |
public All<Reading> mergeAccumulators( | |
Iterable<All<Reading>> accumulators) { | |
All<Reading> merge = new All<Reading>(); | |
for (All<Reading> accumulator : accumulators) | |
merge.getItems() | |
.addAll(accumulator.getItems()); | |
return merge; | |
} | |
@Override | |
public All<Reading> extractOutput( | |
All<Reading> accumulator) { | |
return accumulator; | |
} | |
}).withoutDefaults()); | |
allReadings.apply("W2O", ParDo.of(new PrintDoFn<>("W2OUT"))); | |
pipeline.run().waitUntilFinish(); | |
} | |
/* | |
* Simple datatypes and simulator below here. | |
*/ | |
public static void registerCoders(Pipeline pipeline) { | |
pipeline.getCoderRegistry().registerCoderForClass(Reading.class, | |
SerializableCoder.<Reading>of(Reading.class)); | |
pipeline.getCoderRegistry().registerCoderForClass(Collected.class, | |
SerializableCoder.<Collected>of(Collected.class)); | |
pipeline.getCoderRegistry().registerCoderForClass(All.class, | |
SerializableCoder.<All>of(All.class)); | |
} | |
public static TimestampedValue<Reading> newReading(Random rand) { | |
Reading reading = Reading.newReading(rand); | |
return TimestampedValue.of(reading, new Instant(reading.getTs())); | |
} | |
public static class SimulateReadings | |
implements | |
Iterable<TimestampedValue<Reading>> { | |
@Override | |
public Iterator<TimestampedValue<Reading>> iterator() { | |
Random rand = new Random(12345678); | |
Supplier<TimestampedValue<Reading>> supplier = () -> newReading( | |
rand); | |
return new Iterator<TimestampedValue<Reading>>() { | |
int count = 0; | |
@Override | |
public boolean hasNext() { | |
return count < 20; | |
} | |
@Override | |
public TimestampedValue<Reading> next() { | |
count++; | |
TimestampedValue<Reading> value = supplier.get(); | |
return value; | |
} | |
}; | |
} | |
} | |
public static class Reading implements Serializable { | |
@Override | |
public int hashCode() { | |
return Objects.hash(id, ts, value); | |
} | |
@Override | |
public boolean equals(Object obj) { | |
if (this == obj) | |
return true; | |
if (obj == null) | |
return false; | |
if (getClass() != obj.getClass()) | |
return false; | |
Reading other = (Reading) obj; | |
return id == other.id && ts == other.ts && value == other.value; | |
} | |
public int getId() { | |
return id; | |
} | |
public int getValue() { | |
return value; | |
} | |
public long getTs() { | |
return ts; | |
} | |
private final int id; | |
private final int value; | |
private final long ts; | |
public Reading(int id, int value, long ts) { | |
this.id = id; | |
this.value = value; | |
this.ts = ts; | |
} | |
public String toString() { | |
return "R" + id + ":" + value; | |
} | |
public static Reading newReading(Random rand) { | |
return new Reading(rand.nextInt(5), rand.nextInt(100), | |
System.currentTimeMillis()); | |
} | |
} | |
public static class Collected<T> implements Serializable { | |
@Override | |
public int hashCode() { | |
return Objects.hash(id, items); | |
} | |
@Override | |
public boolean equals(Object obj) { | |
if (this == obj) | |
return true; | |
if (obj == null) | |
return false; | |
if (getClass() != obj.getClass()) | |
return false; | |
Collected other = (Collected) obj; | |
return id == other.id && Objects.equals(items, other.items); | |
} | |
public int getId() { | |
return id; | |
} | |
private final int id; | |
private final List<T> items = new ArrayList<>(); | |
public List<T> getItems() { | |
return items; | |
} | |
public Collected(int id) { | |
this.id = id; | |
} | |
public String toString() { | |
return "C" + id + ":" + items; | |
} | |
} | |
public static class All<T> implements Serializable { | |
@Override | |
public int hashCode() { | |
return Objects.hash(items); | |
} | |
@Override | |
public boolean equals(Object obj) { | |
if (this == obj) | |
return true; | |
if (obj == null) | |
return false; | |
if (getClass() != obj.getClass()) | |
return false; | |
All other = (All) obj; | |
return Objects.equals(items, other.items); | |
} | |
private final List<T> items = new ArrayList<>(); | |
public String toString() { | |
return "A:" + items; | |
} | |
public List<T> getItems() { | |
return items; | |
} | |
} | |
/** | |
* A simple DoFn that prints message to System.out | |
*/ | |
private static class PrintDoFn<T> extends DoFn<T, Void> { | |
private final String tag; | |
public PrintDoFn(String tag) { | |
this.tag = tag; | |
} | |
@ProcessElement | |
public void processElement(ProcessContext c) { | |
System.out.println(tag + ":" + c.element()); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment