Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply("Read_file_from_GCS",
TextIO.read().from("gs:bucket_name/some_file.csv"))
.apply("Transform_file",
ParDo.of(doFn))
.apply("Write_results_to_BQ",
BigQueryIO.writeTableRows()
.to(new DynamicDestinations<TableRow, String>() {
@Override
public String getDestination(ValueInSingleWindow<TableRow> element) {
return element.getValue().get("section").toString();
}
@Override
public TableDestination getTable(String table) {
String destination = String.format("%s:%s.%s", project, dataset, table);
return new TableDestination(destination, destination);
}
@Override
public TableSchema getSchema(String table) {
return getSchemaForTable(table);
}
})
.withCreateDisposition(CREATE_IF_NEEDED)
.withWriteDisposition(WRITE_APPEND));
pipeline.run();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.