Skip to content

Instantly share code, notes, and snippets.

Created December 5, 2017 04:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anonymous/a7798edbbfe464545dc351c63e57bd5e to your computer and use it in GitHub Desktop.
Save anonymous/a7798edbbfe464545dc351c63e57bd5e to your computer and use it in GitHub Desktop.
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply("Read_file_from_GCS",
TextIO.read().from("gs:bucket_name/some_file.csv"))
.apply("Transform_file",
ParDo.of(doFn))
.apply("Write_results_to_BQ",
BigQueryIO.writeTableRows()
.to(new DynamicDestinations<TableRow, String>() {
@Override
public String getDestination(ValueInSingleWindow<TableRow> element) {
return element.getValue().get("section").toString();
}
@Override
public TableDestination getTable(String table) {
String destination = String.format("%s:%s.%s", project, dataset, table);
return new TableDestination(destination, destination);
}
@Override
public TableSchema getSchema(String table) {
return getSchemaForTable(table);
}
})
.withCreateDisposition(CREATE_IF_NEEDED)
.withWriteDisposition(WRITE_APPEND));
pipeline.run();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment