Skip to content

Instantly share code, notes, and snippets.

@matthieucham
Created January 15, 2021 20:55
Show Gist options
  • Save matthieucham/85459eff5fdea8d115be520e2dd5ccc1 to your computer and use it in GitHub Desktop.
Save matthieucham/85459eff5fdea8d115be520e2dd5ccc1 to your computer and use it in GitHub Desktop.
Beam transform to work around BEAM-3772 issue : create empty table in before batch loading the PCollection in it.
import com.google.cloud.bigquery.*;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
import org.joda.time.Duration;
import java.util.concurrent.ExecutionException;
public class BigQueryAutoCreateTable<T> extends PTransform<PCollection<T>, PCollection<T>> {
private SerializableFunction<T, String> outputTableNameFn;
private Schema targetTableSchema;
public static final <T> BigQueryAutoCreateTable via(SerializableFunction<T, String> outputTableNameFn,
Schema targetTableSchema) {
return new BigQueryAutoCreateTable(outputTableNameFn, targetTableSchema);
}
private BigQueryAutoCreateTable(SerializableFunction<T, String> outputTableNameFn,
Schema targetTabelSchema) {
this.outputTableNameFn = outputTableNameFn;
this.targetTableSchema = targetTabelSchema;
}
static class CreateIfNeeded<T> extends DoFn<KV<String, Iterable<T>>, Iterable<T>> {
private Cache<String, Boolean> existingTablesCache;
private BigQuery bigquery;
private Schema targetSchema;
private CreateIfNeeded(Schema targetSchema) {
this.targetSchema = targetSchema;
}
@Setup
public void setup() {
this.existingTablesCache = CacheBuilder.newBuilder().maximumSize(10).build();
this.bigquery = BigQueryOptions.getDefaultInstance().getService();
}
@ProcessElement
public void processElement(@Element KV<String, Iterable<T>> element,
OutputReceiver<Iterable<T>> out) {
String target = element.getKey();
String[] projAndDs = target.split(":");
String[] dsAndTab;
String project = null;
if (projAndDs.length > 0) {
dsAndTab = projAndDs[1].split("\\.");
project = projAndDs[0];
} else {
dsAndTab = projAndDs[0].split("\\.");
}
String dataset = dsAndTab[0];
String tablename = dsAndTab[1];
TableId tableId;
if (project == null) {
tableId = TableId.of(dataset, tablename);
} else {
tableId = TableId.of(project, dataset, tablename);
}
try {
if (!this.existingTablesCache.get(target, () -> bigquery.getTable(tableId) != null)) {
// Table does not exist, create it
TableDefinition tabDef = StandardTableDefinition.of(this.targetSchema);
bigquery.create(TableInfo.of(tableId, tabDef));
existingTablesCache.put(target, true);
}
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
// Pass value (colletion of NewsletterMessage) to next step.
out.output(element.getValue());
}
}
@Override
public PCollection<T> expand(PCollection<T> input) {
PCollection<T> withExstingTarget = input.apply(
"Batchify input",
Window.<T>into(
FixedWindows.of(Duration.standardSeconds(15l))
)
)
.apply(
"Compute target table name",
WithKeys.of(this.outputTableNameFn)
)
.apply(GroupByKey.create())
.apply(
"Create target table if needed",
ParDo.of(new CreateIfNeeded<T>(this.targetTableSchema))
)
.apply("Flatten messages lots",
Flatten.iterables()
);
return withExstingTarget;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment