Create a gist now

Instantly share code, notes, and snippets.

Embed
Write from Cloud Pub/Sub to BigQuery using Fileload and save cost on streaming inserts!
package ...
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.api.services.bigquery.model.TimePartitioning;
import com.google.common.collect.ImmutableList;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method.FILE_LOADS;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition.WRITE_APPEND;
public class ClickLogConsumer {
private static final int BATCH_INTERVAL = 15;
private static final int NUM_SHARDS = 100;
private static final String TOPIC = "projects/pureapp-199411/topics/clicks";
private static final String PROJECT = "pure-app";
private static final String TIMESTAMP_ATTRIBUTE = "timestamp";
private static final String TIME_PARTITIONING_COLUMN = "created_at";
private static final String DATASET_FIELD = "user_id";
private static final String TABLE_FIELD = "campaign_id";
private static final TableSchema SCHEMA = new TableSchema().setFields(
ImmutableList.of(
new TableFieldSchema().setName(TIME_PARTITIONING_COLUMN).setType("TIMESTAMP"),
new TableFieldSchema().setName("exchange").setType("STRING"))
);
public static PTransform<PCollection<String>, PCollection<TableRow>> jsonToTableRow() {
return new JsonToTableRow();
}
private static class JsonToTableRow
extends PTransform<PCollection<String>, PCollection<TableRow>> {
@Override
public PCollection<TableRow> expand(PCollection<String> stringPCollection) {
return stringPCollection.apply("JsonToTableRow", MapElements.<String, com.google.api.services.bigquery.model.TableRow>via(
new SimpleFunction<String, TableRow>() {
@Override
public TableRow apply(String json) {
try {
InputStream inputStream = new ByteArrayInputStream(
json.getBytes(StandardCharsets.UTF_8.name()));
//OUTER is used here to prevent EOF exception
return TableRowJsonCoder.of().decode(inputStream, Coder.Context.OUTER);
} catch (IOException e) {
throw new RuntimeException("Unable to parse input", e);
}
}
}));
}
}
public static void main(String[] args) throws Exception {
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply(PubsubIO.readStrings().withTimestampAttribute(TIMESTAMP_ATTRIBUTE).fromTopic(TOPIC))
.apply(jsonToTableRow())
.apply(BigQueryIO.write()
.withTriggeringFrequency(Duration.standardMinutes(BATCH_INTERVAL))
.withMethod(FILE_LOADS)
.withNumFileShards(NUM_SHARDS)
.withWriteDisposition(WRITE_APPEND)
.withCreateDisposition(CREATE_IF_NEEDED)
.withSchema(SCHEMA)
.to((row) -> {
String datasetName = row.getValue().get(DATASET_FIELD).toString();
String tableName = row.getValue().get(TABLE_FIELD).toString();
return new TableDestination(String.format("%s:%s.%s", PROJECT, datasetName, tableName), "Some destination");
})
.withTimePartitioning(new TimePartitioning().setField(TIME_PARTITIONING_COLUMN)));
pipeline.run();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment