Skip to content

Instantly share code, notes, and snippets.

@amygdala
Created January 8, 2016 01:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save amygdala/ecf8f718d23b3514763d to your computer and use it in GitHub Desktop.
Save amygdala/ecf8f718d23b3514763d to your computer and use it in GitHub Desktop.
/*
* Copyright (C) 2015 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.dataflow.examples.complete.game;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.dataflow.examples.common.DataflowExampleOptions;
import com.google.cloud.dataflow.examples.common.DataflowExampleUtils;
import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.PipelineResult;
import com.google.cloud.dataflow.sdk.io.BigQueryIO;
import com.google.cloud.dataflow.sdk.io.BigQueryIO.Write.CreateDisposition;
import com.google.cloud.dataflow.sdk.io.PubsubIO;
import com.google.cloud.dataflow.sdk.options.Default;
import com.google.cloud.dataflow.sdk.options.Description;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.options.StreamingOptions;
import com.google.cloud.dataflow.sdk.options.Validation;
import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
import com.google.cloud.dataflow.sdk.transforms.Count;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.DoFn.RequiresWindowAccess;
import com.google.cloud.dataflow.sdk.transforms.MapElements;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.transforms.SimpleFunction;
import com.google.cloud.dataflow.sdk.transforms.windowing.AfterEach;
import com.google.cloud.dataflow.sdk.transforms.windowing.AfterProcessingTime;
import com.google.cloud.dataflow.sdk.transforms.windowing.AfterWatermark;
import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindows;
import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow;
import com.google.cloud.dataflow.sdk.transforms.windowing.Repeatedly;
import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.values.PDone;
import com.google.cloud.dataflow.sdk.values.TypeDescriptor;
import org.joda.time.DateTimeZone;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.TimeZone;
/**
* This class is the third in a series of four pipelines that tell a story in a 'gaming' domain,
* following {@link UserScore} and {@link HourlyTeamScore}. Concepts include: processing unbounded
* data using fixed windows; use of custom timestamps and event-time processing; generation of
* early/speculative results; using .accumulatingFiredPanes() to do cumulative processing of late-
* arriving data.
*
* <p> This pipeline processes an unbounded stream of 'game events'. The calculation of the team
* scores uses fixed windowing based on event time (the time of the game play event), not
* processing time (the time that an event is processed by the pipeline). The pipeline calculates
* the sum of scores per team, for each window. By default, the team scores are calculated using
* one-hour windows.
*
* <p> In contrast-- to demo another windowing option-- the user scores are calculated using a
* global window, which periodically (every ten minutes) emits cumulative user score sums.
*
* <p> In contrast to the previous pipelines in the series, which used static, finite input data,
* here we're using an unbounded data source, which lets us provide speculative results, and allows
* handling of late data, at much lower latency. We can use the early/speculative results to keep a
* 'LeaderBoardCount' updated in near-realtime. Our handling of late data lets us generate correct
* results, e.g. for 'team prizes'. We're now outputing window results as they're
* calculated, giving us much lower latency than with the previous batch examples.
*
* <p> Run {@link injector.Injector} to generate pubsub data for this pipeline. The Injector
* documentation provides more detail on how to do this.
*
* <p> To execute this pipeline using the Dataflow service, specify the pipeline configuration
* like this:
* <pre>{@code
* --project=YOUR_PROJECT_ID
* --stagingLocation=gs://YOUR_STAGING_DIRECTORY
* --runner=BlockingDataflowPipelineRunner
* --dataset=YOUR-DATASET
* --topic=projects/YOUR-PROJECT/topics/YOUR-TOPIC
* }
* </pre>
* where the BigQuery dataset you specify must already exist.
* The PubSub topic you specify should be the same topic to which the Injector is publishing.
*/
public class LeaderBoardCount extends HourlyTeamScore {
private static final String TIMESTAMP_ATTRIBUTE = "timestamp_ms";
private static DateTimeFormatter fmt =
DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss.SSS")
.withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("PST")));
static final Duration FIVE_MINUTES = Duration.standardMinutes(5);
static final Duration TEN_MINUTES = Duration.standardMinutes(10);
public static class ExtractAndCountScore
extends PTransform<PCollection<GameActionInfo>, PCollection<KV<String, Long>>> {
private final String field;
ExtractAndCountScore(String field) {
this.field = field;
}
@Override
public PCollection<KV<String, Long>> apply(
PCollection<GameActionInfo> gameInfo) {
return gameInfo
.apply(MapElements
.via((GameActionInfo gInfo) -> KV.of(gInfo.getKey(field), gInfo.getScore()))
.withOutputType(new TypeDescriptor<KV<String, Integer>>() {}))
.apply(Count.<String, Integer>perKey());
}
}
/**
* Format information for scores, and write that info to BigQuery.
* Optionally include fixed windowing information and timing in the result.
*/
public static class WriteScoresToBigQuery
extends PTransform<PCollection<KV<String, Long>>, PDone> {
private final String fieldName;
private final String tablePrefix;
private final boolean writeTiming; // Whether to write timing info to the resultant table.
private final boolean writeWindowStart; // whether to include window start info.
public WriteScoresToBigQuery(String tablePrefix, String fieldName,
boolean writeWindowStart, boolean writeTiming) {
this.fieldName = fieldName;
this.tablePrefix = tablePrefix;
this.writeWindowStart = writeWindowStart;
this.writeTiming = writeTiming;
}
/** Convert each key/score pair into a BigQuery TableRow. */
private class BuildFixedRowFn extends DoFn<KV<String, Long>, TableRow>
implements RequiresWindowAccess {
@Override
public void processElement(ProcessContext c) {
// IntervalWindow w = (IntervalWindow) c.window();
TableRow row = new TableRow()
.set(fieldName, c.element().getKey())
.set("count", c.element().getValue().longValue())
.set("processing_time", fmt.print(Instant.now()));
if (writeWindowStart) {
IntervalWindow w = (IntervalWindow) c.window();
row.set("window_start", fmt.print(w.start()));
}
if (writeTiming) {
row.set("timing", c.pane().getTiming().toString());
}
c.output(row);
}
}
/** Build the output table schema. */
private TableSchema getFixedSchema() {
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName(fieldName).setType("STRING"));
fields.add(new TableFieldSchema().setName("count").setType("INTEGER"));
fields.add(new TableFieldSchema().setName("processing_time").setType("STRING"));
if (writeWindowStart) {
fields.add(new TableFieldSchema().setName("window_start").setType("STRING"));
}
if (writeTiming) {
fields.add(new TableFieldSchema().setName("timing").setType("STRING"));
}
return new TableSchema().setFields(fields);
}
@Override
public PDone apply(PCollection<KV<String, Long>> teamAndScore) {
return teamAndScore
.apply(ParDo.named("ConvertToFixedTriggersRow").of(new BuildFixedRowFn()))
.apply(BigQueryIO.Write
.to(getTable(teamAndScore.getPipeline(),
tablePrefix + "_" + fieldName))
.withSchema(getFixedSchema())
.withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED));
}
}
/**
* Options supported by {@link LeaderBoardCount}.
*/
static interface Options extends HourlyTeamScore.Options, DataflowExampleOptions {
@Description("Pub/Sub topic to read from")
@Validation.Required
String getTopic();
void setTopic(String value);
@Description("Numeric value of fixed window duration for team analysis, in minutes")
@Default.Integer(60)
Integer getTeamWindowDuration();
void setTeamWindowDuration(Integer value);
@Description("Numeric value of allowed data lateness, in minutes")
@Default.Integer(120)
Integer getAllowedLateness();
void setAllowedLateness(Integer value);
@Description("Prefix used for the BigQuery table names")
@Default.String("LeaderBoardCount")
String getTableName();
void setTableName(String value);
}
public static void main(String[] args) throws Exception {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
// Enforce that this pipeline is always run in streaming mode.
options.setStreaming(true);
// For example purposes, allow the pipeline to be easily cancelled instead of running
// continuously.
options.setRunner(DataflowPipelineRunner.class);
DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options);
Pipeline pipeline = Pipeline.create(options);
// Read game events from Pub/Sub using custom timestamps, which are extracted from the pubsub
// data elements, and parse the data.
PCollection<GameActionInfo> gameEvents = pipeline
.apply(PubsubIO.Read.timestampLabel(TIMESTAMP_ATTRIBUTE).topic(options.getTopic()))
.apply(ParDo.named("ParseGameEvent").of(new ParseEventFn()));
// Extract team/score pairs from the event stream, using hour-long windows by default.
gameEvents
.apply(Window.named("LeaderBoardCountTeamFixedWindows")
.<GameActionInfo>into(FixedWindows.of(
Duration.standardMinutes(options.getTeamWindowDuration())))
// We will get early (speculative) results as well as cumulative
// processing of late data.
.triggering(
AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(FIVE_MINUTES))
.withLateFirings(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(TEN_MINUTES)))
.withAllowedLateness(Duration.standardMinutes(options.getAllowedLateness()))
.accumulatingFiredPanes())
// Extract and sum teamname/score pairs from the event data.
.apply("ExtractTeamScore", new ExtractAndCountScore("team"))
// Write the results to BigQuery.
.apply("WriteTeamScoreSums",
new WriteScoresToBigQuery(options.getTableName(), "team", true, true));
// Run the pipeline and wait for the pipeline to finish; capture cancellation requests from the
// command line.
PipelineResult result = pipeline.run();
dataflowUtils.waitToFinish(result);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment