Created
January 15, 2021 20:55
-
-
Save matthieucham/85459eff5fdea8d115be520e2dd5ccc1 to your computer and use it in GitHub Desktop.
Beam transform to work around BEAM-3772 issue : create empty table in before batch loading the PCollection in it.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.google.cloud.bigquery.*; | |
import org.apache.beam.sdk.transforms.*; | |
import org.apache.beam.sdk.transforms.windowing.FixedWindows; | |
import org.apache.beam.sdk.transforms.windowing.Window; | |
import org.apache.beam.sdk.values.KV; | |
import org.apache.beam.sdk.values.PCollection; | |
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache; | |
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder; | |
import org.joda.time.Duration; | |
import java.util.concurrent.ExecutionException; | |
public class BigQueryAutoCreateTable<T> extends PTransform<PCollection<T>, PCollection<T>> { | |
private SerializableFunction<T, String> outputTableNameFn; | |
private Schema targetTableSchema; | |
public static final <T> BigQueryAutoCreateTable via(SerializableFunction<T, String> outputTableNameFn, | |
Schema targetTableSchema) { | |
return new BigQueryAutoCreateTable(outputTableNameFn, targetTableSchema); | |
} | |
private BigQueryAutoCreateTable(SerializableFunction<T, String> outputTableNameFn, | |
Schema targetTabelSchema) { | |
this.outputTableNameFn = outputTableNameFn; | |
this.targetTableSchema = targetTabelSchema; | |
} | |
static class CreateIfNeeded<T> extends DoFn<KV<String, Iterable<T>>, Iterable<T>> { | |
private Cache<String, Boolean> existingTablesCache; | |
private BigQuery bigquery; | |
private Schema targetSchema; | |
private CreateIfNeeded(Schema targetSchema) { | |
this.targetSchema = targetSchema; | |
} | |
@Setup | |
public void setup() { | |
this.existingTablesCache = CacheBuilder.newBuilder().maximumSize(10).build(); | |
this.bigquery = BigQueryOptions.getDefaultInstance().getService(); | |
} | |
@ProcessElement | |
public void processElement(@Element KV<String, Iterable<T>> element, | |
OutputReceiver<Iterable<T>> out) { | |
String target = element.getKey(); | |
String[] projAndDs = target.split(":"); | |
String[] dsAndTab; | |
String project = null; | |
if (projAndDs.length > 0) { | |
dsAndTab = projAndDs[1].split("\\."); | |
project = projAndDs[0]; | |
} else { | |
dsAndTab = projAndDs[0].split("\\."); | |
} | |
String dataset = dsAndTab[0]; | |
String tablename = dsAndTab[1]; | |
TableId tableId; | |
if (project == null) { | |
tableId = TableId.of(dataset, tablename); | |
} else { | |
tableId = TableId.of(project, dataset, tablename); | |
} | |
try { | |
if (!this.existingTablesCache.get(target, () -> bigquery.getTable(tableId) != null)) { | |
// Table does not exist, create it | |
TableDefinition tabDef = StandardTableDefinition.of(this.targetSchema); | |
bigquery.create(TableInfo.of(tableId, tabDef)); | |
existingTablesCache.put(target, true); | |
} | |
} catch (ExecutionException e) { | |
throw new RuntimeException(e); | |
} | |
// Pass value (colletion of NewsletterMessage) to next step. | |
out.output(element.getValue()); | |
} | |
} | |
@Override | |
public PCollection<T> expand(PCollection<T> input) { | |
PCollection<T> withExstingTarget = input.apply( | |
"Batchify input", | |
Window.<T>into( | |
FixedWindows.of(Duration.standardSeconds(15l)) | |
) | |
) | |
.apply( | |
"Compute target table name", | |
WithKeys.of(this.outputTableNameFn) | |
) | |
.apply(GroupByKey.create()) | |
.apply( | |
"Create target table if needed", | |
ParDo.of(new CreateIfNeeded<T>(this.targetTableSchema)) | |
) | |
.apply("Flatten messages lots", | |
Flatten.iterables() | |
); | |
return withExstingTarget; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment