Skip to content

Instantly share code, notes, and snippets.

@polleyg
Last active June 29, 2019 10:41
Show Gist options
  • Save polleyg/051f07683cfc19932db9c9c4f079b079 to your computer and use it in GitHub Desktop.
Save polleyg/051f07683cfc19932db9c9c4f079b079 to your computer and use it in GitHub Desktop.
This code works out the location of the buckets and also the storage class
//imports & doc omitted for brevity. See repo for full source file.
//https://github.com/polleyg/gcp-dataflow-copy-bigquery/blob/master/src/main/java/org/polleyg/BQTableCopyPipeline.java
public class BQTableCopyPipeline {
private static final Logger LOG = LoggerFactory.getLogger(BQTableCopyPipeline.class);
private static final String DEFAULT_NUM_WORKERS = "1";
private static final String DEFAULT_MAX_WORKERS = "3";
private static final String DEFAULT_TYPE_WORKERS = "n1-standard-1";
private static final String DEFAULT_ZONE = "australia-southeast1-a";
private static final String DEFAULT_WRITE_DISPOSITION = "truncate";
private static final String DEFAULT_DETECT_SCHEMA = "true";
public static void main(String[] args) throws Exception {
new BQTableCopyPipeline().copy(args);
}
private void copy(final String[] args) throws Exception {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
Config config = mapper.readValue(
new File(getClass().getClassLoader().getResource("config.yaml").getFile()),
new TypeReference<Config>() {
});
PipelineOptionsFactory.register(DataflowPipelineOptions.class);
DataflowPipelineOptions options = PipelineOptionsFactory
.fromArgs(args)
.as(DataflowPipelineOptions.class);
options.setProject(config.project);
options.setRunner(GCPHelpers.getRunnerClass(config.runner));
LOG.info("Project set to '{}' and runner set to '{}'", config.project, config.runner);
config.copies.forEach(copy -> setupAndRunPipeline(options, copy));
}
private void setupAndRunPipeline(final DataflowPipelineOptions options,
final Map<String, String> copy) {
LOG.info("Running a copy for '{}'", copy);
String sourceTable = checkNotNull(copy.get("source"), "Source table cannot be null");
String targetTable = checkNotNull(copy.get("target"), "Target table cannot be null");
int numWorkers = Integer.valueOf(copy.getOrDefault("numWorkers", DEFAULT_NUM_WORKERS));
int maxNumWorkers = Integer.valueOf(copy.getOrDefault("maxNumWorkers", DEFAULT_MAX_WORKERS));
boolean detectSchema = Boolean.valueOf(copy.getOrDefault("detectSchema", DEFAULT_DETECT_SCHEMA));
String zone = copy.getOrDefault("zone", DEFAULT_ZONE);
String worker = copy.getOrDefault("workerMachineType", DEFAULT_TYPE_WORKERS);
WriteDisposition writeDisposition = GCPHelpers.getWriteDisposition(copy.getOrDefault("writeDisposition", DEFAULT_WRITE_DISPOSITION));
String targetDatasetLocation = copy.getOrDefault("targetDatasetLocation", null);
options.setNumWorkers(numWorkers);
options.setMaxNumWorkers(maxNumWorkers);
options.setZone(zone);
options.setWorkerMachineType(worker);
TableSchema schema = null; //no schema is permitted
if (detectSchema) {
schema = GCPHelpers.getTableSchema(sourceTable);
}
runPipeline(options, schema, sourceTable, targetTable, targetDatasetLocation, writeDisposition);
}
private void runPipeline(final DataflowPipelineOptions options,
final TableSchema schema,
final String sourceTable,
final String targetTable,
final String targetDatasetLocation,
final WriteDisposition writeDisposition) {
String targetLocation = getTargetDatasetLocation(targetTable, targetDatasetLocation);
String sourceLocation = GCPHelpers.getDatasetLocation(sourceTable);
String exportBucket = format("%s_df_bqcopy_export_%s", options.getProject(), sourceLocation);
String importBucket = format("%s_df_bqcopy_import_%s", options.getProject(), targetLocation);
handleBucketCreation(exportBucket, sourceLocation);
handleBucketCreation(importBucket, targetLocation);
options.setTempLocation(format("gs://%s/tmp", exportBucket));
options.setStagingLocation(format("gs://%s/jars", exportBucket));
options.setJobName(format("bq-table-copy-%s-to-%s-%d", sourceLocation, targetLocation, currentTimeMillis()));
LOG.info("Running Dataflow pipeline with options '{}'", options);
Pipeline pipeline = Pipeline.create(options);
PCollection<TableRow> rows = pipeline.apply(format("Read: %s", sourceTable), BigQueryIO.readTableRows().from(sourceTable));
if (schema != null) {
rows.apply(format("Write: %s", targetTable), BigQueryIO.writeTableRows()
.to(targetTable)
.withCreateDisposition(CREATE_IF_NEEDED)
.withWriteDisposition(writeDisposition)
.withSchema(schema)
.withCustomGcsTempLocation(StaticValueProvider.of((format("gs://%s", importBucket)))));
} else {
rows.apply(format("Write: %s", targetTable), BigQueryIO.writeTableRows()
.to(targetTable)
.withCreateDisposition(CREATE_NEVER)
.withWriteDisposition(writeDisposition)
.withCustomGcsTempLocation(StaticValueProvider.of((format("gs://%s", importBucket)))));
}
pipeline.run();
}
private void handleBucketCreation(final String name,
final String location) {
try {
GCPHelpers.createGCSBucket(name, location);
} catch (StorageException e) {
if (e.getCode() != HttpStatus.SC_CONFLICT) { // 409 == bucket already exists. That's ok.
throw new IllegalStateException(e);
}
}
}
private String getTargetDatasetLocation(final String targetTable,
final String targetDatasetLocation) {
String location;
if (targetDatasetLocation == null) {
//target dataset/table should already exist in this case
try {
location = GCPHelpers.getDatasetLocation(targetTable);
} catch (RuntimeException e) {
throw new IllegalStateException("'targetDatasetLocation' wasn't specified in config, but it looks" +
" like the target dataset doesn't exist.");
}
} else {
//otherwise, try and create it for the user
location = targetDatasetLocation;
try {
GCPHelpers.createBQDataset(targetTable, targetDatasetLocation);
} catch (BigQueryException e) {
if (e.getCode() == HttpStatus.SC_CONFLICT) { // 409 == dataset already exists
throw new IllegalStateException(
format("'targetDatasetLocation' specified in config, but the dataset '%s' already exists",
targetTable));
} else {
throw new IllegalStateException(e);
}
}
}
assert location != null;
return location;
}
private static class Config {
@JsonProperty
public List<Map<String, String>> copies;
@JsonProperty
public String project;
@JsonProperty
public String runner;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment