Skip to content

Instantly share code, notes, and snippets.

@zero-master
Created April 18, 2018 11:09
Show Gist options
  • Save zero-master/88c9f3698c91f64c158da6716ed24f58 to your computer and use it in GitHub Desktop.
Save zero-master/88c9f3698c91f64c158da6716ed24f58 to your computer and use it in GitHub Desktop.
Write from Cloud Pub/Sub to BigQuery using Fileload and save cost on streaming inserts!
package ...
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.api.services.bigquery.model.TimePartitioning;
import com.google.common.collect.ImmutableList;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method.FILE_LOADS;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition.WRITE_APPEND;
public class ClickLogConsumer {
private static final int BATCH_INTERVAL = 15;
private static final int NUM_SHARDS = 100;
private static final String TOPIC = "projects/pureapp-199411/topics/clicks";
private static final String PROJECT = "pure-app";
private static final String TIMESTAMP_ATTRIBUTE = "timestamp";
private static final String TIME_PARTITIONING_COLUMN = "created_at";
private static final String DATASET_FIELD = "user_id";
private static final String TABLE_FIELD = "campaign_id";
private static final TableSchema SCHEMA = new TableSchema().setFields(
ImmutableList.of(
new TableFieldSchema().setName(TIME_PARTITIONING_COLUMN).setType("TIMESTAMP"),
new TableFieldSchema().setName("exchange").setType("STRING"))
);
public static PTransform<PCollection<String>, PCollection<TableRow>> jsonToTableRow() {
return new JsonToTableRow();
}
private static class JsonToTableRow
extends PTransform<PCollection<String>, PCollection<TableRow>> {
@Override
public PCollection<TableRow> expand(PCollection<String> stringPCollection) {
return stringPCollection.apply("JsonToTableRow", MapElements.<String, com.google.api.services.bigquery.model.TableRow>via(
new SimpleFunction<String, TableRow>() {
@Override
public TableRow apply(String json) {
try {
InputStream inputStream = new ByteArrayInputStream(
json.getBytes(StandardCharsets.UTF_8.name()));
//OUTER is used here to prevent EOF exception
return TableRowJsonCoder.of().decode(inputStream, Coder.Context.OUTER);
} catch (IOException e) {
throw new RuntimeException("Unable to parse input", e);
}
}
}));
}
}
public static void main(String[] args) throws Exception {
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply(PubsubIO.readStrings().withTimestampAttribute(TIMESTAMP_ATTRIBUTE).fromTopic(TOPIC))
.apply(jsonToTableRow())
.apply(BigQueryIO.write()
.withTriggeringFrequency(Duration.standardMinutes(BATCH_INTERVAL))
.withMethod(FILE_LOADS)
.withNumFileShards(NUM_SHARDS)
.withWriteDisposition(WRITE_APPEND)
.withCreateDisposition(CREATE_IF_NEEDED)
.withSchema(SCHEMA)
.to((row) -> {
String datasetName = row.getValue().get(DATASET_FIELD).toString();
String tableName = row.getValue().get(TABLE_FIELD).toString();
return new TableDestination(String.format("%s:%s.%s", PROJECT, datasetName, tableName), "Some destination");
})
.withTimePartitioning(new TimePartitioning().setField(TIME_PARTITIONING_COLUMN)));
pipeline.run();
}
}
@kirantpatil
Copy link

It is honour to see guys like you who share knowledge.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment