Skip to content

Instantly share code, notes, and snippets.

@bindiego
Last active March 24, 2020 10:09
Show Gist options
  • Save bindiego/3814cfbd3b8d47216fe74686b0ae4339 to your computer and use it in GitHub Desktop.
Save bindiego/3814cfbd3b8d47216fe74686b0ae4339 to your computer and use it in GitHub Desktop.
Apache Beam & GCP Dataflow triggers explained

Apache Beam & GCP Dataflow triggers explained 触发器解释

Full Code Sample

We only explain the triggers here, that would be helpful for you to, say, design your bigtable schema. Like answering the tall or wide question depends on your use case.

Example borrowed from here

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.examples.cookbook;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.beam.examples.common.ExampleBigQueryTableOptions;
import org.apache.beam.examples.common.ExampleOptions;
import org.apache.beam.examples.common.ExampleUtils;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.AfterEach;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.joda.time.Duration;
import org.joda.time.Instant;
/**
* This example illustrates the basic concepts behind triggering. It shows how to use different
* trigger definitions to produce partial (speculative) results before all the data is processed and
* to control when updated results are produced for late data. The example performs a streaming
* analysis of the data coming in from a text file and writes the results to BigQuery. It divides
* the data into {@link Window windows} to be processed, and demonstrates using various kinds of
* {@link org.apache.beam.sdk.transforms.windowing.Trigger triggers} to control when the results for
* each window are emitted.
*
* <p>This example uses a portion of real traffic data from San Diego freeways. It contains readings
* from sensor stations set up along each freeway. Each sensor reading includes a calculation of the
* 'total flow' across all lanes in that freeway direction.
*
* <p>Concepts:
*
* <pre>
* 1. The default triggering behavior
* 2. Late data with the default trigger
* 3. How to get speculative estimates
* 4. Combining late data and speculative estimates
* </pre>
*
* <p>Before running this example, it will be useful to familiarize yourself with Beam triggers and
* understand the concept of 'late data', See: <a
* href="https://beam.apache.org/documentation/programming-guide/#triggers">
* https://beam.apache.org/documentation/programming-guide/#triggers</a>
*
* <p>The example is configured to use the default BigQuery table from the example common package
* (there are no defaults for a general Beam pipeline). You can override them by using the {@code
* --bigQueryDataset}, and {@code --bigQueryTable} options. If the BigQuery table do not exist, the
* example will try to create them.
*
* <p>The pipeline outputs its results to a BigQuery table. Here are some queries you can use to see
* interesting results: Replace {@code <enter_table_name>} in the query below with the name of the
* BigQuery table. Replace {@code <enter_window_interval>} in the query below with the window
* interval.
*
* <p>To see the results of the default trigger, Note: When you start up your pipeline, you'll
* initially see results from 'late' data. Wait after the window duration, until the first pane of
* non-late data has been emitted, to see more interesting results. {@code SELECT * FROM
* enter_table_name WHERE trigger_type = "default" ORDER BY window DESC}
*
* <p>To see the late data i.e. dropped by the default trigger, {@code SELECT * FROM
* <enter_table_name> WHERE trigger_type = "withAllowedLateness" and (timing = "LATE" or timing =
* "ON_TIME") and freeway = "5" ORDER BY window DESC, processing_time}
*
* <p>To see the the difference between accumulation mode and discarding mode, {@code SELECT * FROM
* <enter_table_name> WHERE (timing = "LATE" or timing = "ON_TIME") AND (trigger_type =
* "withAllowedLateness" or trigger_type = "sequential") and freeway = "5" ORDER BY window DESC,
* processing_time}
*
* <p>To see speculative results every minute, {@code SELECT * FROM <enter_table_name> WHERE
* trigger_type = "speculative" and freeway = "5" ORDER BY window DESC, processing_time}
*
* <p>To see speculative results every five minutes after the end of the window {@code SELECT * FROM
* <enter_table_name> WHERE trigger_type = "sequential" and timing != "EARLY" and freeway = "5"
* ORDER BY window DESC, processing_time}
*
* <p>To see the first and the last pane for a freeway in a window for all the trigger types, {@code
* SELECT * FROM <enter_table_name> WHERE (isFirst = true or isLast = true) ORDER BY window}
*
* <p>To reduce the number of results for each query we can add additional where clauses. For
* examples, To see the results of the default trigger, {@code SELECT * FROM <enter_table_name>
* WHERE trigger_type = "default" AND freeway = "5" AND window = "<enter_window_interval>"}
*
* <p>The example will try to cancel the pipelines on the signal to terminate the process (CTRL-C)
* and then exits.
*/
public class TriggerExample {
// Numeric value of fixed window duration, in minutes
public static final int WINDOW_DURATION = 30;
// Constants used in triggers.
// Speeding up ONE_MINUTE or FIVE_MINUTES helps you get an early approximation of results.
// ONE_MINUTE is used only with processing time before the end of the window
public static final Duration ONE_MINUTE = Duration.standardMinutes(1);
// FIVE_MINUTES is used only with processing time after the end of the window
public static final Duration FIVE_MINUTES = Duration.standardMinutes(5);
// ONE_DAY is used to specify the amount of lateness allowed for the data elements.
public static final Duration ONE_DAY = Duration.standardDays(1);
/**
* This transform demonstrates using triggers to control when data is produced for each window
* Consider an example to understand the results generated by each type of trigger. The example
* uses "freeway" as the key. Event time is the timestamp associated with the data element and
* processing time is the time when the data element gets processed in the pipeline. For freeway
* 5, suppose there are 10 elements in the [10:00:00, 10:30:00) window. Key (freeway) | Value
* (total_flow) | event time | processing time 5 | 50 | 10:00:03 | 10:00:47 5 | 30 | 10:01:00 |
* 10:01:03 5 | 30 | 10:02:00 | 11:07:00 5 | 20 | 10:04:10 | 10:05:15 5 | 60 | 10:05:00 | 11:03:00
* 5 | 20 | 10:05:01 | 11.07:30 5 | 60 | 10:15:00 | 10:27:15 5 | 40 | 10:26:40 | 10:26:43 5 | 60 |
* 10:27:20 | 10:27:25 5 | 60 | 10:29:00 | 11:11:00
*
* <p>Beam tracks a watermark which records up to what point in event time the data is complete.
* For the purposes of the example, we'll assume the watermark is approximately 15m behind the
* current processing time. In practice, the actual value would vary over time based on the
* systems knowledge of the current delay and contents of the backlog (data that has not yet been
* processed).
*
* <p>If the watermark is 15m behind, then the window [10:00:00, 10:30:00) (in event time) would
* close at 10:44:59, when the watermark passes 10:30:00.
*/
static class CalculateTotalFlow
extends PTransform<PCollection<KV<String, Integer>>, PCollectionList<TableRow>> {
private int windowDuration;
CalculateTotalFlow(int windowDuration) {
this.windowDuration = windowDuration;
}
@Override
public PCollectionList<TableRow> expand(PCollection<KV<String, Integer>> flowInfo) {
// Concept #1: The default triggering behavior
// By default Beam uses a trigger which fires when the watermark has passed the end of the
// window. This would be written {@code Repeatedly.forever(AfterWatermark.pastEndOfWindow())}.
// The system also defaults to dropping late data -- data which arrives after the watermark
// has passed the event timestamp of the arriving element. This means that the default trigger
// will only fire once.
// Each pane produced by the default trigger with no allowed lateness will be the first and
// last pane in the window, and will be ON_TIME.
// The results for the example above with the default trigger and zero allowed lateness
// would be:
// Key (freeway) | Value (total_flow) | number_of_records | isFirst | isLast | timing
// 5 | 260 | 6 | true | true | ON_TIME
// At 11:03:00 (processing time) the system watermark may have advanced to 10:54:00. As a
// result, when the data record with event time 10:05:00 arrives at 11:03:00, it is considered
// late, and dropped.
PCollection<TableRow> defaultTriggerResults =
flowInfo
.apply(
"Default",
Window
// The default window duration values work well if you're running the default
// input
// file. You may want to adjust the window duration otherwise.
.<KV<String, Integer>>into(
FixedWindows.of(Duration.standardMinutes(windowDuration)))
// The default trigger first emits output when the system's watermark passes
// the end
// of the window.
.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
// Late data is dropped
.withAllowedLateness(Duration.ZERO)
// Discard elements after emitting each pane.
// With no allowed lateness and the specified trigger there will only be a
// single
// pane, so this doesn't have a noticeable effect. See concept 2 for more
// details.
.discardingFiredPanes())
.apply(new TotalFlow("default"));
// Concept #2: Late data with the default trigger
// This uses the same trigger as concept #1, but allows data that is up to ONE_DAY late. This
// leads to each window staying open for ONE_DAY after the watermark has passed the end of the
// window. Any late data will result in an additional pane being fired for that same window.
// The first pane produced will be ON_TIME and the remaining panes will be LATE.
// To definitely get the last pane when the window closes, use
// .withAllowedLateness(ONE_DAY, ClosingBehavior.FIRE_ALWAYS).
// The results for the example above with the default trigger and ONE_DAY allowed lateness
// would be:
// Key (freeway) | Value (total_flow) | number_of_records | isFirst | isLast | timing
// 5 | 260 | 6 | true | false | ON_TIME
// 5 | 60 | 1 | false | false | LATE
// 5 | 30 | 1 | false | false | LATE
// 5 | 20 | 1 | false | false | LATE
// 5 | 60 | 1 | false | false | LATE
PCollection<TableRow> withAllowedLatenessResults =
flowInfo
.apply(
"WithLateData",
Window.<KV<String, Integer>>into(
FixedWindows.of(Duration.standardMinutes(windowDuration)))
// Late data is emitted as it arrives
.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
// Once the output is produced, the pane is dropped and we start preparing the
// next
// pane for the window
.discardingFiredPanes()
// Late data is handled up to one day
.withAllowedLateness(ONE_DAY))
.apply(new TotalFlow("withAllowedLateness"));
// Concept #3: How to get speculative estimates
// We can specify a trigger that fires independent of the watermark, for instance after
// ONE_MINUTE of processing time. This allows us to produce speculative estimates before
// all the data is available. Since we don't have any triggers that depend on the watermark
// we don't get an ON_TIME firing. Instead, all panes are either EARLY or LATE.
// We also use accumulatingFiredPanes to build up the results across each pane firing.
// The results for the example above for this trigger would be:
// Key (freeway) | Value (total_flow) | number_of_records | isFirst | isLast | timing
// 5 | 80 | 2 | true | false | EARLY
// 5 | 100 | 3 | false | false | EARLY
// 5 | 260 | 6 | false | false | EARLY
// 5 | 320 | 7 | false | false | LATE
// 5 | 370 | 9 | false | false | LATE
// 5 | 430 | 10 | false | false | LATE
PCollection<TableRow> speculativeResults =
flowInfo
.apply(
"Speculative",
Window.<KV<String, Integer>>into(
FixedWindows.of(Duration.standardMinutes(windowDuration)))
// Trigger fires every minute.
.triggering(
Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane()
// Speculative every ONE_MINUTE
.plusDelayOf(ONE_MINUTE)))
// After emitting each pane, it will continue accumulating the elements so
// that each
// approximation includes all of the previous data in addition to the newly
// arrived
// data.
.accumulatingFiredPanes()
.withAllowedLateness(ONE_DAY))
.apply(new TotalFlow("speculative"));
// Concept #4: Combining late data and speculative estimates
// We can put the previous concepts together to get EARLY estimates, an ON_TIME result,
// and LATE updates based on late data.
// Each time a triggering condition is satisfied it advances to the next trigger.
// If there are new elements this trigger emits a window under following condition:
// > Early approximations every minute till the end of the window.
// > An on-time firing when the watermark has passed the end of the window
// > Every five minutes of late data.
// Every pane produced will either be EARLY, ON_TIME or LATE.
// The results for the example above for this trigger would be:
// Key (freeway) | Value (total_flow) | number_of_records | isFirst | isLast | timing
// 5 | 80 | 2 | true | false | EARLY
// 5 | 100 | 3 | false | false | EARLY
// 5 | 260 | 6 | false | false | EARLY
// [First pane fired after the end of the window]
// 5 | 320 | 7 | false | false | ON_TIME
// 5 | 430 | 10 | false | false | LATE
// For more possibilities of how to build advanced triggers, see {@link Trigger}.
PCollection<TableRow> sequentialResults =
flowInfo
.apply(
"Sequential",
Window.<KV<String, Integer>>into(
FixedWindows.of(Duration.standardMinutes(windowDuration)))
.triggering(
AfterEach.inOrder(
Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane()
// Speculative every ONE_MINUTE
.plusDelayOf(ONE_MINUTE))
.orFinally(AfterWatermark.pastEndOfWindow()),
Repeatedly.forever(
AfterProcessingTime.pastFirstElementInPane()
// Late data every FIVE_MINUTES
.plusDelayOf(FIVE_MINUTES))))
.accumulatingFiredPanes()
// For up to ONE_DAY
.withAllowedLateness(ONE_DAY))
.apply(new TotalFlow("sequential"));
// Adds the results generated by each trigger type to a PCollectionList.
PCollectionList<TableRow> resultsList =
PCollectionList.of(defaultTriggerResults)
.and(withAllowedLatenessResults)
.and(speculativeResults)
.and(sequentialResults);
return resultsList;
}
}
//////////////////////////////////////////////////////////////////////////////////////////////////
// The remaining parts of the pipeline are needed to produce the output for each
// concept above. Not directly relevant to understanding the trigger examples.
/**
* Calculate total flow and number of records for each freeway and format the results to TableRow
* objects, to save to BigQuery.
*/
static class TotalFlow
extends PTransform<PCollection<KV<String, Integer>>, PCollection<TableRow>> {
private String triggerType;
public TotalFlow(String triggerType) {
this.triggerType = triggerType;
}
@Override
public PCollection<TableRow> expand(PCollection<KV<String, Integer>> flowInfo) {
PCollection<KV<String, Iterable<Integer>>> flowPerFreeway =
flowInfo.apply(GroupByKey.create());
PCollection<KV<String, String>> results =
flowPerFreeway.apply(
ParDo.of(
new DoFn<KV<String, Iterable<Integer>>, KV<String, String>>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
Iterable<Integer> flows = c.element().getValue();
Integer sum = 0;
Long numberOfRecords = 0L;
for (Integer value : flows) {
sum += value;
numberOfRecords++;
}
c.output(KV.of(c.element().getKey(), sum + "," + numberOfRecords));
}
}));
PCollection<TableRow> output = results.apply(ParDo.of(new FormatTotalFlow(triggerType)));
return output;
}
}
/**
* Format the results of the Total flow calculation to a TableRow, to save to BigQuery. Adds the
* triggerType, pane information, processing time and the window timestamp.
*/
static class FormatTotalFlow extends DoFn<KV<String, String>, TableRow> {
private String triggerType;
public FormatTotalFlow(String triggerType) {
this.triggerType = triggerType;
}
@ProcessElement
public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
String[] values = c.element().getValue().split(",", -1);
TableRow row =
new TableRow()
.set("trigger_type", triggerType)
.set("freeway", c.element().getKey())
.set("total_flow", Integer.parseInt(values[0]))
.set("number_of_records", Long.parseLong(values[1]))
.set("window", window.toString())
.set("isFirst", c.pane().isFirst())
.set("isLast", c.pane().isLast())
.set("timing", c.pane().getTiming().toString())
.set("event_time", c.timestamp().toString())
.set("processing_time", Instant.now().toString());
c.output(row);
}
}
/**
* Extract the freeway and total flow in a reading. Freeway is used as key since we are
* calculating the total flow for each freeway.
*/
static class ExtractFlowInfo extends DoFn<String, KV<String, Integer>> {
private static final int VALID_NUM_FIELDS = 50;
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
String[] laneInfo = c.element().split(",", -1);
if ("timestamp".equals(laneInfo[0])) {
// Header row
return;
}
if (laneInfo.length < VALID_NUM_FIELDS) {
// Skip the invalid input.
return;
}
String freeway = laneInfo[2];
Integer totalFlow = tryIntegerParse(laneInfo[7]);
// Ignore the records with total flow 0 to easily understand the working of triggers.
// Skip the records with total flow -1 since they are invalid input.
if (totalFlow == null || totalFlow <= 0) {
return;
}
c.output(KV.of(freeway, totalFlow));
}
}
/** Inherits standard configuration options. */
public interface TrafficFlowOptions
extends ExampleOptions, ExampleBigQueryTableOptions, StreamingOptions {
@Description("Input file to read from")
@Default.String(
"gs://apache-beam-samples/traffic_sensor/" + "Freeways-5Minaa2010-01-01_to_2010-02-15.csv")
String getInput();
void setInput(String value);
@Description("Numeric value of window duration for fixed windows, in minutes")
@Default.Integer(WINDOW_DURATION)
Integer getWindowDuration();
void setWindowDuration(Integer value);
}
public static void main(String[] args) throws Exception {
TrafficFlowOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(TrafficFlowOptions.class);
options.setStreaming(true);
options.setBigQuerySchema(getSchema());
ExampleUtils exampleUtils = new ExampleUtils(options);
exampleUtils.setup();
Pipeline pipeline = Pipeline.create(options);
TableReference tableRef =
getTableReference(
options.getProject(), options.getBigQueryDataset(), options.getBigQueryTable());
PCollectionList<TableRow> resultList =
pipeline
.apply("ReadMyFile", TextIO.read().from(options.getInput()))
.apply("InsertRandomDelays", ParDo.of(new InsertDelays()))
.apply(ParDo.of(new ExtractFlowInfo()))
.apply(new CalculateTotalFlow(options.getWindowDuration()));
for (int i = 0; i < resultList.size(); i++) {
resultList.get(i).apply(BigQueryIO.writeTableRows().to(tableRef).withSchema(getSchema()));
}
PipelineResult result = pipeline.run();
// ExampleUtils will try to cancel the pipeline and the injector before the program exits.
exampleUtils.waitToFinish(result);
}
/** Add current time to each record. Also insert a delay at random to demo the triggers. */
public static class InsertDelays extends DoFn<String, String> {
private static final double THRESHOLD = 0.001;
// MIN_DELAY and MAX_DELAY in minutes.
private static final int MIN_DELAY = 1;
private static final int MAX_DELAY = 100;
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
Instant timestamp = Instant.now();
Random random = new Random();
if (random.nextDouble() < THRESHOLD) {
int range = MAX_DELAY - MIN_DELAY;
int delayInMinutes = random.nextInt(range) + MIN_DELAY;
long delayInMillis = TimeUnit.MINUTES.toMillis(delayInMinutes);
timestamp = new Instant(timestamp.getMillis() - delayInMillis);
}
c.outputWithTimestamp(c.element(), timestamp);
}
}
/** Sets the table reference. */
private static TableReference getTableReference(String project, String dataset, String table) {
TableReference tableRef = new TableReference();
tableRef.setProjectId(project);
tableRef.setDatasetId(dataset);
tableRef.setTableId(table);
return tableRef;
}
/** Defines the BigQuery schema used for the output. */
private static TableSchema getSchema() {
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("trigger_type").setType("STRING"));
fields.add(new TableFieldSchema().setName("freeway").setType("STRING"));
fields.add(new TableFieldSchema().setName("total_flow").setType("INTEGER"));
fields.add(new TableFieldSchema().setName("number_of_records").setType("INTEGER"));
fields.add(new TableFieldSchema().setName("window").setType("STRING"));
fields.add(new TableFieldSchema().setName("isFirst").setType("BOOLEAN"));
fields.add(new TableFieldSchema().setName("isLast").setType("BOOLEAN"));
fields.add(new TableFieldSchema().setName("timing").setType("STRING"));
fields.add(new TableFieldSchema().setName("event_time").setType("TIMESTAMP"));
fields.add(new TableFieldSchema().setName("processing_time").setType("TIMESTAMP"));
TableSchema schema = new TableSchema().setFields(fields);
return schema;
}
private static Integer tryIntegerParse(String number) {
try {
return Integer.parseInt(number);
} catch (NumberFormatException e) {
return null;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment