Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply("Read_file_from_GCS",
TextIO.read().from("gs:bucket_name/some_file.csv"))
.apply("Transform_file",
ParDo.of(doFn))
.apply("Write_results_to_BQ_table_partition_by_date",
BigQueryIO.writeTableRows()
.to((SerializableFunction<ValueInSingleWindow<TableRow>, TableDestination>) element -> {
TableRow row = element.getValue();
String partitionDate = row.get("time_ts")
.toString().replace("-","").substring(0,8);
String destination = String.format("%s:%s.%s$%s",
project, dataset, table, partitionDate);
return new TableDestination(destination, null);
})
.withCreateDisposition(CREATE_IF_NEEDED)
.withWriteDisposition(WRITE_APPEND)
.withSchema(schema));
pipeline.run();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.