Skip to content

Instantly share code, notes, and snippets.

@zero-master
Created April 18, 2018 11:09
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save zero-master/88c9f3698c91f64c158da6716ed24f58 to your computer and use it in GitHub Desktop.
Save zero-master/88c9f3698c91f64c158da6716ed24f58 to your computer and use it in GitHub Desktop.
Write from Cloud Pub/Sub to BigQuery using Fileload and save cost on streaming inserts!
package ...
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.api.services.bigquery.model.TimePartitioning;
import com.google.common.collect.ImmutableList;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method.FILE_LOADS;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition.WRITE_APPEND;
public class ClickLogConsumer {
private static final int BATCH_INTERVAL = 15;
private static final int NUM_SHARDS = 100;
private static final String TOPIC = "projects/pureapp-199411/topics/clicks";
private static final String PROJECT = "pure-app";
private static final String TIMESTAMP_ATTRIBUTE = "timestamp";
private static final String TIME_PARTITIONING_COLUMN = "created_at";
private static final String DATASET_FIELD = "user_id";
private static final String TABLE_FIELD = "campaign_id";
private static final TableSchema SCHEMA = new TableSchema().setFields(
ImmutableList.of(
new TableFieldSchema().setName(TIME_PARTITIONING_COLUMN).setType("TIMESTAMP"),
new TableFieldSchema().setName("exchange").setType("STRING"))
);
public static PTransform<PCollection<String>, PCollection<TableRow>> jsonToTableRow() {
return new JsonToTableRow();
}
private static class JsonToTableRow
extends PTransform<PCollection<String>, PCollection<TableRow>> {
@Override
public PCollection<TableRow> expand(PCollection<String> stringPCollection) {
return stringPCollection.apply("JsonToTableRow", MapElements.<String, com.google.api.services.bigquery.model.TableRow>via(
new SimpleFunction<String, TableRow>() {
@Override
public TableRow apply(String json) {
try {
InputStream inputStream = new ByteArrayInputStream(
json.getBytes(StandardCharsets.UTF_8.name()));
//OUTER is used here to prevent EOF exception
return TableRowJsonCoder.of().decode(inputStream, Coder.Context.OUTER);
} catch (IOException e) {
throw new RuntimeException("Unable to parse input", e);
}
}
}));
}
}
public static void main(String[] args) throws Exception {
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply(PubsubIO.readStrings().withTimestampAttribute(TIMESTAMP_ATTRIBUTE).fromTopic(TOPIC))
.apply(jsonToTableRow())
.apply(BigQueryIO.write()
.withTriggeringFrequency(Duration.standardMinutes(BATCH_INTERVAL))
.withMethod(FILE_LOADS)
.withNumFileShards(NUM_SHARDS)
.withWriteDisposition(WRITE_APPEND)
.withCreateDisposition(CREATE_IF_NEEDED)
.withSchema(SCHEMA)
.to((row) -> {
String datasetName = row.getValue().get(DATASET_FIELD).toString();
String tableName = row.getValue().get(TABLE_FIELD).toString();
return new TableDestination(String.format("%s:%s.%s", PROJECT, datasetName, tableName), "Some destination");
})
.withTimePartitioning(new TimePartitioning().setField(TIME_PARTITIONING_COLUMN)));
pipeline.run();
}
}
@kirantpatil
Copy link

Hi,

I am new to Bigquery, please help me to understand the terminology and technique you employed in detail.

Why BATCH_INTERVAL is set to 15 ?

What is SHARDS and why do we need to use it here ?

Why NUM_SHARD is set to 100 ?

Are we using Partitioned table and why is it needed ?

I am designing Bigquery for google-iot core handling home automation data for testing purpose.

It would be helpful if you explain your design in detail.

Thanks,
Kiran.

@kirantpatil
Copy link

kirantpatil commented Aug 13, 2018

Hi,

I read your blog and some concepts are clear but I need clarification regarding below points.

  1. What is SHARDS and why do we need to use it here ?

  2. Why NUM_SHARD is set to 100 ?

  3. Why are we using Partitioned tables ?

  4. Correct me If I am wrong, I want to retain user information at his particular database, so that it will help to get analytics easily from a single table rather than multiple partitioned tables ?

  5. In your case how are you getting analytics for per day/week/month using Datastudio ?

  6. What type of instance are you using for Dataflow to consume 1B to 50B data ?

  7. Are the messages stored in Bigquery in-order based on timestamp with which they were created at the Source (device/sensor) ?

  8. How are you ensuring that all the messages are written to Bigquery with out any loss within 24 hours ?

Thanks,
Kiran.

@kirantpatil
Copy link

It is honour to see guys like you who share knowledge.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment