Skip to content

Instantly share code, notes, and snippets.

Created December 5, 2017 04:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anonymous/96b7d4cf11a5361f2b96b7874a5a8b81 to your computer and use it in GitHub Desktop.
Save anonymous/96b7d4cf11a5361f2b96b7874a5a8b81 to your computer and use it in GitHub Desktop.
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply("Read_file_from_GCS",
TextIO.read().from("gs:bucket_name/some_file.csv"))
.apply("Transform_file",
ParDo.of(doFn))
.apply("Write_results_to_BQ_table_partition_by_date",
BigQueryIO.writeTableRows()
.to((SerializableFunction<ValueInSingleWindow<TableRow>, TableDestination>) element -> {
TableRow row = element.getValue();
String table = row.get("section").toString();
String destination = String.format("%s:%s.%s", project, dataset, table);
return new TableDestination(destination, destination);
})
.withCreateDisposition(CREATE_IF_NEEDED)
.withWriteDisposition(WRITE_APPEND)
.withSchema(schema));
pipeline.run();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment