Skip to content

Instantly share code, notes, and snippets.

@dhalperi
Created March 30, 2017 15:48
Show Gist options
  • Star 8 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dhalperi/4bbd13021dd5f9998250cff99b155db6 to your computer and use it in GitHub Desktop.
Save dhalperi/4bbd13021dd5f9998250cff99b155db6 to your computer and use it in GitHub Desktop.
BigQuery example: load an external archive into date-partitioned tables
/*
* Copyright (C) 2016 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.dataflow.examples.partition;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.io.BigQueryIO;
import com.google.cloud.dataflow.sdk.io.BigQueryIO.Write.WriteDisposition;
import com.google.cloud.dataflow.sdk.io.CountingInput;
import com.google.cloud.dataflow.sdk.io.Read;
import com.google.cloud.dataflow.sdk.options.Default;
import com.google.cloud.dataflow.sdk.options.Description;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.transforms.DoFn;
import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
import com.google.cloud.dataflow.sdk.transforms.MapElements;
import com.google.cloud.dataflow.sdk.transforms.PTransform;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.transforms.Partition;
import com.google.cloud.dataflow.sdk.transforms.SimpleFunction;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.cloud.dataflow.sdk.values.PCollection;
import com.google.cloud.dataflow.sdk.values.PCollectionList;
import com.google.cloud.dataflow.sdk.values.PDone;
import com.google.common.collect.ImmutableList;
import org.joda.time.DateTimeZone;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import java.util.concurrent.ThreadLocalRandom;
/**
* A Dataflow that uses the idiom of date sharded BigQuery tables.
*
* <p>NOTE: Follow [BEAM-437] (https://issues.apache.org/jira/browse/BEAM-437).
* This code should become completely unnecessary.
*/
@SuppressWarnings("serial")
public class DateShardedBigQueryTables {
/**
* A partition function that assigns an incoming element to a partition based on how many
* days it is after some sentinel start time.
*
* <p>The first {@code numPartitions-1} partitions contain elements with times up to
* over {@code numPartitions-1} days. The last partition is used for any elements that are
* outside the expected range.
*/
private static class DateShardingPartitionFunction<T> implements Partition.PartitionFn<T> {
private SimpleFunction<T, Instant> dateFn;
private Instant startDate;
public DateShardingPartitionFunction(
SimpleFunction<T, Instant> dateFn, Instant startDate) {
this.dateFn = dateFn;
this.startDate = startDate;
}
@Override
public int partitionFor(T elem, int numPartitions) {
Instant elementDate = dateFn.apply(elem);
long dateOffset = new Duration(startDate, elementDate).getStandardDays();
if (dateOffset >= 0 && dateOffset < (numPartitions - 1)) {
return (int) dateOffset;
}
return numPartitions - 1;
}
}
/**
* Uses a {@link GroupByKey} to split the pipeline in half, so that the stage that includes
* partitioning (which contains all tuples) is isolated from each individual partition.
*
* <p>Works by adding a random key to each incoming element, grouping on that random key, and
* then removing the keys from all elements after the {@link GroupByKey}.
*/
private static class Reshuffle<T> extends PTransform<PCollection<T>, PCollection<T>> {
private static class AddArbitraryKey<T> extends DoFn<T, KV<Integer, T>> {
@Override
public void processElement(ProcessContext c) throws Exception {
c.output(KV.of(ThreadLocalRandom.current().nextInt(), c.element()));
}
}
private static class RemoveArbitraryKey<T> extends DoFn<KV<Integer, Iterable<T>>, T> {
@Override
public void processElement(ProcessContext c) throws Exception {
for (T s : c.element().getValue()) {
c.output(s);
}
}
}
@Override
public PCollection<T> apply(PCollection<T> input) {
return input
.apply(ParDo.of(new AddArbitraryKey<T>()))
.apply(GroupByKey.<Integer, T>create())
.apply(ParDo.of(new RemoveArbitraryKey<T>()));
}
}
/**
* The function that actually builds the pipeline. It uses the input functions and constants
* to create {@code numDays} different partitions each corresponding to a day in the range
* {@code [0, numDays)} days after the {@code startDate}. It also creates an extra partition for
* any elements that fall outside that date range.
*
* @param input the values to be partitioned and written out
* @param startDate the initial time reference -- aka, day 0.
* @param numDays the number of consecutive days, beginning at the {@code startDate}, over which
* records will be partitioned.
* @param dateFn a function that assigns an element to its timestamp.
* @param tableNameFn a function that generates a BigQuery table name from a date.
* @param tableRowFn a function that turns a value of type {@code T} into a {@link TableRow}
* ready for insertion to BigQuery. Must match {@code writeSchema}.
* @param writeSchema the schema of the BigQuery tables into which the {@link TableRow} records
* will be written.
* @param leftoverTableName the name of the table where all the records that fall outside the
* range {@code [startDate, startDate + numDays days)} will be inserted.
*/
public static <T> void shardByDate(
PCollection<T> input,
Instant startDate,
int numDays,
SimpleFunction<T, Instant> dateFn,
SimpleFunction<Instant, String> tableNameFn,
SimpleFunction<T, TableRow> tableRowFn,
TableSchema writeSchema,
String leftoverTableName) {
Partition.PartitionFn<T> partFn = new DateShardingPartitionFunction<>(dateFn, startDate);
PCollectionList<T> partitions = input.apply(Partition.of(numDays + 1, partFn));
for (int i = 0; i < numDays; ++i) {
Instant partitionDate = startDate.plus(Duration.standardDays(i));
String partitionTableName = tableNameFn.apply(partitionDate);
partitions
.get(i)
.apply(
"Write_" + i, new ReshuffleAndWrite<>(partitionTableName, tableRowFn, writeSchema));
}
partitions
.get(numDays)
.apply(
"Write_leftovers", new ReshuffleAndWrite<>(leftoverTableName, tableRowFn, writeSchema));
}
private static class ReshuffleAndWrite<T> extends PTransform<PCollection<T>, PDone> {
private final transient String tableName;
private final transient TableSchema writeSchema;
private final transient SimpleFunction<T, TableRow> tableRowFn;
public ReshuffleAndWrite(
String tableName, SimpleFunction<T, TableRow> tableRowFn, TableSchema writeSchema) {
this.tableName = tableName;
this.tableRowFn = tableRowFn;
this.writeSchema = writeSchema;
}
@Override
public PDone apply(PCollection<T> in) {
return in.apply("Reshuffle", new Reshuffle<T>())
.apply("TableRow", MapElements.via(tableRowFn))
.apply(
"Write",
BigQueryIO.Write.to(tableName)
.withSchema(writeSchema)
.withWriteDisposition(WriteDisposition.WRITE_TRUNCATE));
}
}
/**
* A Joda-time formatter that prints a date in format like {@code "20160101"}. Threadsafe.
*/
private static final DateTimeFormatter FORMATTER =
DateTimeFormat.forPattern("yyyyMMdd").withZone(DateTimeZone.UTC);
interface Options extends PipelineOptions {
@Description("The start date of the sharded tables in YYYYMMDD form, e.g., 2016-01-31")
@Default.String("2016-01-01")
String getStartDate();
void setStartDate(String startDate);
@Description("The number of days for which to produce output tables")
@Default.Integer(100)
Integer getNumOutputDays();
void setNumOutputDays(Integer numOutputDays);
@Description("The number of days over which to produce random data. If more than"
+ " --numOutputDays, there will be data in the leftovers table. If fewer than"
+ " --numOutputDays, there will be empty output tables.")
@Default.Integer(105)
Integer getNumDataDays();
void setNumDataDays(Integer numDataDays);
@Description("The number of records produced per day of data (set by --numDataDays).")
@Default.Integer(5000)
Integer getNumRecordsPerDay();
void setNumRecordsPerDay(Integer numRecordsPerDay);
@Description("The BigQuery partition table name. When writing, a per-day partition suffix "
+ " $YYYYMMDD will be appended. E.g., 'project:dataset.sharding_test$20161213'.")
@Default.String("stress.sharding_test")
String getOutputTable();
void setOutputTable(String outputTable);
}
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
Pipeline p = Pipeline.create(options);
TableFieldSchema outputField = new TableFieldSchema().setName("value").setType("INTEGER");
TableSchema outputSchema = new TableSchema().setFields(ImmutableList.of(outputField));
final Instant startDate = Instant.parse(options.getStartDate());
final int numDataDays = options.getNumDataDays();
final int totalNumRecords = numDataDays * options.getNumRecordsPerDay();
final String baseTableName = options.getOutputTable();
SimpleFunction<Long, Instant> dateFn = new SimpleFunction<Long, Instant>() {
@Override
public Instant apply(Long value) {
return startDate.plus(Duration.standardDays(
ThreadLocalRandom.current().nextInt(numDataDays)));
}
};
SimpleFunction<Instant, String> tableNameFn = new SimpleFunction<Instant, String>() {
@Override
public String apply(Instant instant) {
return String.format("%s$%s", baseTableName, FORMATTER.print(instant));
}
};
SimpleFunction<Long, TableRow> tableRowFn = new SimpleFunction<Long, TableRow>() {
@Override
public TableRow apply(Long value) {
return new TableRow().set("value", value);
}
};
// Creates records and assigned their timestamps.
PCollection<Long> inputData =
p.apply("GenerateData", CountingInput.upTo(totalNumRecords));
shardByDate(
inputData,
startDate,
options.getNumOutputDays(),
dateFn,
tableNameFn,
tableRowFn,
outputSchema,
baseTableName + "_leftoverValues");
p.run();
}
}
@vidhyasgr
Copy link

Can you please help me, how to do partition from the non-partitioned table? I want cheapest and time-consuming solution. In a kind of urgent.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment