Skip to content

Instantly share code, notes, and snippets.

@feczo
Last active October 5, 2016 13:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save feczo/7a7c882affc60383ee32028049054925 to your computer and use it in GitHub Desktop.
Save feczo/7a7c882affc60383ee32028049054925 to your computer and use it in GitHub Desktop.
/*
* Copyright (C) 2015 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.dataflow.examples.complete;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.dataflow.examples.common.DataflowExampleUtils;
import com.google.cloud.dataflow.examples.common.ExampleBigQueryTableOptions;
import com.google.cloud.dataflow.examples.common.ExamplePubsubTopicOptions;
import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.PipelineResult;
import com.google.cloud.dataflow.sdk.coders.TableRowJsonCoder;
import com.google.cloud.dataflow.sdk.io.BigQueryIO;
import com.google.cloud.dataflow.sdk.io.PubsubIO;
import com.google.cloud.dataflow.sdk.options.Default;
import com.google.cloud.dataflow.sdk.options.Description;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import java.io.IOException;
import java.util.ArrayList;
/**
* A streaming Dataflow Example using BigQuery output.
*
* <p>This pipeline example reads lines of text from a PubSub topic, splits each line
* into individual words, capitalizes those words, and writes the output to
* a BigQuery table.
*
* <p>By default, the example will run a separate pipeline to inject the data from the default
* {@literal --inputFile} to the Pub/Sub {@literal --pubsubTopic}. It will make it available for
* the streaming pipeline to process. You may override the default {@literal --inputFile} with the
* file of your choosing. You may also set {@literal --inputFile} to an empty string, which will
* disable the automatic Pub/Sub injection, and allow you to use separate tool to control the input
* to this example.
*
* <p>The example is configured to use the default Pub/Sub topic and the default BigQuery table
* from the example common package (there are no defaults for a general Dataflow pipeline).
* You can override them by using the {@literal --pubsubTopic}, {@literal --bigQueryDataset}, and
* {@literal --bigQueryTable} options. If the Pub/Sub topic or the BigQuery table do not exist,
* the example will try to create them.
*
* <p>The example will try to cancel the pipelines on the signal to terminate the process (CTRL-C)
* and then exits.
*/
public class StreamingLogExtract {
private static final String PUBSUB_TIMESTAMP_LABEL_KEY = "publishTime";
/** A DoFn that tokenizes lines of text into individual words. */
static class ExtractWords extends DoFn<String, String> {
@Override
public void processElement(ProcessContext c) {
String[] words = c.element().split("[^a-zA-Z']+");
for (String word : words) {
if (!word.isEmpty()) {
c.output(word);
}
}
}
}
/** A DoFn that uppercases a word. */
static class Uppercase extends DoFn<String, String> {
@Override
public void processElement(ProcessContext c) {
c.output(c.element().toUpperCase());
}
}
/**
* Converts strings into BigQuery rows.
*/
static class ExtractPayload extends DoFn<TableRow, TableRow> {
/**
* In this example, put only the Payload into single BigQuery field.
*/
@Override
public void processElement(ProcessContext c) {
c.output(new TableRow().set("textPayload", c.element().get("textPayload")));
}
static TableSchema getSchema() {
return new TableSchema().setFields(new ArrayList<TableFieldSchema>() {
// Compose the list of TableFieldSchema from tableSchema.
{
add(new TableFieldSchema().setName("textPayload").setType("STRING"));
}
});
}
}
private interface StreamingLogExtractOptions
extends ExamplePubsubTopicOptions, ExampleBigQueryTableOptions {
@Description("Input file to inject to Pub/Sub topic")
@Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
String getInputFile();
void setInputFile(String value);
}
/**
* Sets up and starts streaming pipeline.
*
* @throws IOException if there is a problem setting up resources
*/
public static void main(String[] args) throws IOException {
StreamingLogExtractOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(StreamingLogExtractOptions.class);
options.setStreaming(true);
// In order to cancel the pipelines automatically,
// {@literal DataflowPipelineRunner} is forced to be used.
options.setRunner(DataflowPipelineRunner.class);
options.setBigQuerySchema(ExtractPayload.getSchema());
DataflowExampleUtils dataflowUtils = new DataflowExampleUtils(options);
dataflowUtils.setup();
Pipeline pipeline = Pipeline.create(options);
String tableSpec = new StringBuilder()
.append(options.getProject()).append(":")
.append(options.getBigQueryDataset()).append(".")
.append(options.getBigQueryTable())
.toString();
pipeline
.apply(PubsubIO.Read
.topic(options.getPubsubTopic())
.withCoder(TableRowJsonCoder.of())
.timestampLabel(PUBSUB_TIMESTAMP_LABEL_KEY))
.apply(ParDo.of(new ExtractPayload()))
.apply(BigQueryIO.Write.to(tableSpec)
.withSchema(ExtractPayload.getSchema()));
PipelineResult result = pipeline.run();
// dataflowUtils will try to cancel the pipeline and the injector before the program exists.
dataflowUtils.waitToFinish(result);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment