Skip to content

Instantly share code, notes, and snippets.

@maciekrb
Created December 25, 2017 12:25
Show Gist options
  • Save maciekrb/9c73cb94a258e177e023dba9049dda13 to your computer and use it in GitHub Desktop.
Save maciekrb/9c73cb94a258e177e023dba9049dda13 to your computer and use it in GitHub Desktop.
Dataflow pipeline to read from a Google Pub/Sub topic and write into a BigQuery table
package com.healthifyme.dftrial;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.healthifyme.dftrial.common.ExampleUtils;
import java.util.ArrayList;
import java.util.List;
import java.util.HashMap;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;
public class PubsubToBigQuery {
public static void main(String[] args) {
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setProject("***");
options.setTempLocation("***");
options.setStagingLocation("***");
options.setRunner(DataflowRunner.class);
options.setStreaming(true);
// Topic to pull data from
String TOPIC_NAME = "***";
// Big query table location to write to
String BQ_DS = "***";
// Build the table schema for the output table.
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("name").setType("STRING"));
fields.add(new TableFieldSchema().setName("age").setType("INTEGER"));
TableSchema schema = new TableSchema().setFields(fields);
Pipeline p = Pipeline.create(options);
p
.apply(PubsubIO.readMessagesWithAttributes().fromTopic(TOPIC_NAME))
.apply("ConvertDataToTableRows", ParDo.of(new DoFn<PubsubMessage, TableRow>() {
@ProcessElement
public void processElement(ProcessContext c) {
System.out.println("Inside processor..");
PubsubMessage message = c.element();
String name = message.getAttribute("name");
int age = Integer.parseInt(message.getAttribute("age"));
System.out.println("Creating table row..");
System.out.println(name + " :: " + age);
TableRow row = new TableRow()
.set("name", name)
.set("age", age);
c.output(row);
}
}))
.apply("InsertTableRowsToBigQuery",
BigQueryIO.writeTableRows().to(BQ_DS)
.withSchema(schema)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
// Run the pipeline
p.run().waitUntilFinish();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment