-
-
Save zero-master/88c9f3698c91f64c158da6716ed24f58 to your computer and use it in GitHub Desktop.
package ... | |
import com.google.api.services.bigquery.model.TableFieldSchema; | |
import com.google.api.services.bigquery.model.TableRow; | |
import com.google.api.services.bigquery.model.TableSchema; | |
import com.google.api.services.bigquery.model.TimePartitioning; | |
import com.google.common.collect.ImmutableList; | |
import org.apache.beam.sdk.Pipeline; | |
import org.apache.beam.sdk.coders.Coder; | |
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; | |
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination; | |
import org.apache.beam.sdk.io.gcp.bigquery.TableRowJsonCoder; | |
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; | |
import org.apache.beam.sdk.transforms.MapElements; | |
import org.apache.beam.sdk.transforms.PTransform; | |
import org.apache.beam.sdk.transforms.SimpleFunction; | |
import org.apache.beam.sdk.values.PCollection; | |
import org.joda.time.Duration; | |
import java.io.ByteArrayInputStream; | |
import java.io.IOException; | |
import java.io.InputStream; | |
import java.nio.charset.StandardCharsets; | |
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED; | |
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method.FILE_LOADS; | |
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition.WRITE_APPEND; | |
public class ClickLogConsumer { | |
private static final int BATCH_INTERVAL = 15; | |
private static final int NUM_SHARDS = 100; | |
private static final String TOPIC = "projects/pureapp-199411/topics/clicks"; | |
private static final String PROJECT = "pure-app"; | |
private static final String TIMESTAMP_ATTRIBUTE = "timestamp"; | |
private static final String TIME_PARTITIONING_COLUMN = "created_at"; | |
private static final String DATASET_FIELD = "user_id"; | |
private static final String TABLE_FIELD = "campaign_id"; | |
private static final TableSchema SCHEMA = new TableSchema().setFields( | |
ImmutableList.of( | |
new TableFieldSchema().setName(TIME_PARTITIONING_COLUMN).setType("TIMESTAMP"), | |
new TableFieldSchema().setName("exchange").setType("STRING")) | |
); | |
public static PTransform<PCollection<String>, PCollection<TableRow>> jsonToTableRow() { | |
return new JsonToTableRow(); | |
} | |
private static class JsonToTableRow | |
extends PTransform<PCollection<String>, PCollection<TableRow>> { | |
@Override | |
public PCollection<TableRow> expand(PCollection<String> stringPCollection) { | |
return stringPCollection.apply("JsonToTableRow", MapElements.<String, com.google.api.services.bigquery.model.TableRow>via( | |
new SimpleFunction<String, TableRow>() { | |
@Override | |
public TableRow apply(String json) { | |
try { | |
InputStream inputStream = new ByteArrayInputStream( | |
json.getBytes(StandardCharsets.UTF_8.name())); | |
//OUTER is used here to prevent EOF exception | |
return TableRowJsonCoder.of().decode(inputStream, Coder.Context.OUTER); | |
} catch (IOException e) { | |
throw new RuntimeException("Unable to parse input", e); | |
} | |
} | |
})); | |
} | |
} | |
public static void main(String[] args) throws Exception { | |
Pipeline pipeline = Pipeline.create(options); | |
pipeline | |
.apply(PubsubIO.readStrings().withTimestampAttribute(TIMESTAMP_ATTRIBUTE).fromTopic(TOPIC)) | |
.apply(jsonToTableRow()) | |
.apply(BigQueryIO.write() | |
.withTriggeringFrequency(Duration.standardMinutes(BATCH_INTERVAL)) | |
.withMethod(FILE_LOADS) | |
.withNumFileShards(NUM_SHARDS) | |
.withWriteDisposition(WRITE_APPEND) | |
.withCreateDisposition(CREATE_IF_NEEDED) | |
.withSchema(SCHEMA) | |
.to((row) -> { | |
String datasetName = row.getValue().get(DATASET_FIELD).toString(); | |
String tableName = row.getValue().get(TABLE_FIELD).toString(); | |
return new TableDestination(String.format("%s:%s.%s", PROJECT, datasetName, tableName), "Some destination"); | |
}) | |
.withTimePartitioning(new TimePartitioning().setField(TIME_PARTITIONING_COLUMN))); | |
pipeline.run(); | |
} | |
} | |
Hi,
I read your blog and some concepts are clear but I need clarification regarding below points.
-
What is SHARDS and why do we need to use it here ?
-
Why NUM_SHARD is set to 100 ?
-
Why are we using Partitioned tables ?
-
Correct me If I am wrong, I want to retain user information at his particular database, so that it will help to get analytics easily from a single table rather than multiple partitioned tables ?
-
In your case how are you getting analytics for per day/week/month using Datastudio ?
-
What type of instance are you using for Dataflow to consume 1B to 50B data ?
-
Are the messages stored in Bigquery in-order based on timestamp with which they were created at the Source (device/sensor) ?
-
How are you ensuring that all the messages are written to Bigquery with out any loss within 24 hours ?
Thanks,
Kiran.
It is honour to see guys like you who share knowledge.
Hi,
I am new to Bigquery, please help me to understand the terminology and technique you employed in detail.
Why BATCH_INTERVAL is set to 15 ?
What is SHARDS and why do we need to use it here ?
Why NUM_SHARD is set to 100 ?
Are we using Partitioned table and why is it needed ?
I am designing Bigquery for google-iot core handling home automation data for testing purpose.
It would be helpful if you explain your design in detail.
Thanks,
Kiran.