Skip to content

Instantly share code, notes, and snippets.

@gxercavins
Created February 8, 2020 12:07
Show Gist options
  • Save gxercavins/fb048ac3ac37484bec265872ea9f1e8a to your computer and use it in GitHub Desktop.
Save gxercavins/fb048ac3ac37484bec265872ea9f1e8a to your computer and use it in GitHub Desktop.
SO question 60070098
package org.apache.beam.examples;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.QueryJobConfiguration;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class BigQueryUpsert {
private static final Logger LOG = LoggerFactory.getLogger(BigQueryUpsert.class);
public static interface MyOptions extends PipelineOptions {
@Description("Output BigQuery table <PROJECT_ID>:<DATASET>.<TABLE>")
String getOutput();
void setOutput(String s);
}
private static void MergeResults() throws Exception {
BigQuery bigquery = BigQueryOptions.getDefaultInstance().getService();
QueryJobConfiguration queryConfig =
QueryJobConfiguration.newBuilder(
"MERGE upsert.full F "
+ "USING upsert.temp T "
+ "ON T.name = F.name "
+ "WHEN MATCHED THEN "
+ "UPDATE SET total = T.total "
+ "WHEN NOT MATCHED THEN "
+ "INSERT(name, total) "
+ "VALUES(name, total)")
.setUseLegacySql(false)
.build();
// Create a job ID so that we can safely retry.
JobId jobId = JobId.of(UUID.randomUUID().toString());
Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build());
// Wait for the query to complete.
queryJob = queryJob.waitFor();
// Check for errors
if (queryJob == null) {
throw new RuntimeException("Job no longer exists");
} else if (queryJob.getStatus().getError() != null) {
throw new RuntimeException(queryJob.getStatus().getError().toString());
}
}
@SuppressWarnings("serial")
public static void main(String[] args) throws Exception {
MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);
Pipeline p = Pipeline.create(options);
String output = options.getOutput();
// Build the table schema for the output table.
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("name").setType("STRING"));
fields.add(new TableFieldSchema().setName("total").setType("INTEGER"));
TableSchema schema = new TableSchema().setFields(fields);
p
.apply("Create Data", Create.of("Start"))
.apply("Write", BigQueryIO
.<String>write()
.to(output)
.withFormatFunction(
(String dummy) ->
new TableRow().set("name", "tv").set("total", 15))
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withSchema(schema));
PipelineResult res = p.run();
res.waitUntilFinish();
if (res.getState() == PipelineResult.State.DONE) {
LOG.info("Dataflow job is finished. Merging results...");
MergeResults();
LOG.info("All done :)");
}
}
}
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>df-upsert</artifactId>
<version>0.1</version>
<packaging>jar</packaging>
<properties>
<beam.version>2.17.0</beam.version>
<bigquery.version>v2-rev20181104-1.27.0</bigquery.version>
<google-clients.version>1.27.0</google-clients.version>
<guava.version>20.0</guava.version>
<hamcrest.version>2.1</hamcrest.version>
<jackson.version>2.9.8</jackson.version>
<joda.version>2.10.1</joda.version>
<junit.version>4.13-beta-1</junit.version>
<maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version>
<maven-exec-plugin.version>1.6.0</maven-exec-plugin.version>
<maven-jar-plugin.version>3.0.2</maven-jar-plugin.version>
<maven-shade-plugin.version>3.1.0</maven-shade-plugin.version>
<mockito.version>1.10.19</mockito.version>
<pubsub.version>v1-rev20181105-1.27.0</pubsub.version>
<slf4j.version>1.7.25</slf4j.version>
<spark.version>2.4.2</spark.version>
<hadoop.version>2.7.3</hadoop.version>
<maven-surefire-plugin.version>2.21.0</maven-surefire-plugin.version>
<nemo.version>0.1</nemo.version>
</properties>
<repositories>
<repository>
<id>apache.snapshots</id>
<name>Apache Development Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven-compiler-plugin.version}</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>${maven-surefire-plugin.version}</version>
<configuration>
<parallel>all</parallel>
<threadCount>4</threadCount>
<redirectTestOutputToFile>true</redirectTestOutputToFile>
</configuration>
<dependencies>
<dependency>
<groupId>org.apache.maven.surefire</groupId>
<artifactId>surefire-junit47</artifactId>
<version>${maven-surefire-plugin.version}</version>
</dependency>
</dependencies>
</plugin>
<!-- Ensure that the Maven jar plugin runs before the Maven
shade plugin by listing the plugin higher within the file. -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<version>${maven-jar-plugin.version}</version>
</plugin>
<!--
Configures `mvn package` to produce a bundled jar ("fat jar") for runners
that require this for job submission to a cluster.
-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>${maven-shade-plugin.version}</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<finalName>${project.artifactId}-bundled-${project.version}</finalName>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/LICENSE</exclude>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>${maven-exec-plugin.version}</version>
<configuration>
<cleanupDaemonThreads>false</cleanupDaemonThreads>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
<profiles>
<profile>
<id>direct-runner</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<!-- Makes the DirectRunner available when running a pipeline. -->
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>
</profile>
<profile>
<id>portable-runner</id>
<activation>
<activeByDefault>true</activeByDefault>
</activation>
<!-- Makes the PortableRunner available when running a pipeline. -->
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-reference-java</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>
</profile>
<profile>
<id>apex-runner</id>
<!-- Makes the ApexRunner available when running a pipeline. -->
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-apex</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope>
</dependency>
<!--
Apex depends on httpclient version 4.3.6, project has a transitive dependency to httpclient 4.0.1 from
google-http-client. Apex dependency version being specified explicitly so that it gets picked up. This
can be removed when the project no longer has a dependency on a different httpclient version.
-->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.3.6</version>
<scope>runtime</scope>
<exclusions>
<exclusion>
<groupId>commons-codec</groupId>
<artifactId>commons-codec</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--
Apex 3.6 is built against YARN 2.6. Version in the fat jar has to match
what's on the cluster, hence we need to repeat the Apex Hadoop dependencies here.
-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-client</artifactId>
<version>${hadoop.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>
</profile>
<profile>
<id>dataflow-runner</id>
<!-- Makes the DataflowRunner available when running a pipeline. -->
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>
</profile>
<profile>
<id>flink-runner</id>
<!-- Makes the FlinkRunner available when running a pipeline. -->
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-flink_2.11</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>
</profile>
<profile>
<id>spark-runner</id>
<!-- Makes the SparkRunner available when running a pipeline. Additionally,
overrides some Spark dependencies to Beam-compatible versions. -->
<properties>
<netty.version>4.1.17.Final</netty.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-spark</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-hadoop-file-system</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
<scope>runtime</scope>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>jul-to-slf4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_2.11</artifactId>
<version>${jackson.version}</version>
<scope>runtime</scope>
</dependency>
<!-- [BEAM-3519] GCP IO exposes netty on its API surface, causing conflicts with runners -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>${beam.version}</version>
<exclusions>
<exclusion>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</profile>
<profile>
<id>gearpump-runner</id>
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-gearpump</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>
</profile>
<profile>
<id>samza-runner</id>
<dependencies>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-samza</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>
</profile>
<profile>
<id>nemo-runner</id>
<dependencies>
<dependency>
<groupId>org.apache.nemo</groupId>
<artifactId>nemo-compiler-frontend-beam</artifactId>
<version>${nemo.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
</profile>
</profiles>
<dependencies>
<!-- Adds a dependency on the Beam SDK. -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>${beam.version}</version>
</dependency>
<!-- Adds a dependency on the Beam Google Cloud Platform IO module. -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
<version>${beam.version}</version>
</dependency>
<!-- Dependencies below this line are specific dependencies needed by the examples code. -->
<dependency>
<groupId>com.google.api-client</groupId>
<artifactId>google-api-client</artifactId>
<version>${google-clients.version}</version>
<exclusions>
<!-- Exclude an old version of guava that is being pulled
in by a transitive dependency of google-api-client -->
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava-jdk5</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.apis</groupId>
<artifactId>google-api-services-bigquery</artifactId>
<version>${bigquery.version}</version>
<exclusions>
<!-- Exclude an old version of guava that is being pulled
in by a transitive dependency of google-api-client -->
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava-jdk5</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-bigquery</artifactId>
<version>1.82.0</version>
</dependency>
<dependency>
<groupId>com.google.http-client</groupId>
<artifactId>google-http-client</artifactId>
<version>${google-clients.version}</version>
<exclusions>
<!-- Exclude an old version of guava that is being pulled
in by a transitive dependency of google-api-client -->
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava-jdk5</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.google.apis</groupId>
<artifactId>google-api-services-pubsub</artifactId>
<version>${pubsub.version}</version>
<exclusions>
<!-- Exclude an old version of guava that is being pulled
in by a transitive dependency of google-api-client -->
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava-jdk5</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>${joda.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
<!-- Add slf4j API frontend binding with JUL backend -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-jdk14</artifactId>
<version>${slf4j.version}</version>
<!-- When loaded at runtime this will wire up slf4j to the JUL backend -->
<scope>runtime</scope>
</dependency>
<!-- Hamcrest and JUnit are required dependencies of PAssert,
which is used in the main code of DebuggingWordCount example. -->
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
<version>${hamcrest.version}</version>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-library</artifactId>
<version>${hamcrest.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
</dependency>
<!-- The DirectRunner is needed for unit tests. -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>${beam.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
@ruthjbajo
Copy link

Hi! Can you provide steps how to compile this and create a dataflow job? Thanks in advance this is example is really useful for my current tasks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment