Skip to content

Instantly share code, notes, and snippets.

@gxercavins
Created February 17, 2019 21:35
Show Gist options
  • Save gxercavins/4a6508501267d63854697767787d88f8 to your computer and use it in GitHub Desktop.
Save gxercavins/4a6508501267d63854697767787d88f8 to your computer and use it in GitHub Desktop.
Test for SO question 54638963
package com.dataflow.samples;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.Method;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
public class NonStandardDelimiters {
public static interface MyOptions extends PipelineOptions {
@Description("Output BigQuery table <PROJECT_ID>:<DATASET>.<TABLE>")
String getOutput();
void setOutput(String s);
@Description("Input file, gs://path/to/file")
String getInput();
void setInput(String s);
}
@SuppressWarnings("serial")
public static void main(String[] args) {
MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);
Pipeline p = Pipeline.create(options);
String file = options.getInput();
String output = options.getOutput();
// Build the table schema for the output table.
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("data").setType("STRING"));
TableSchema schema = new TableSchema().setFields(fields);
p
.apply("GetMessages", TextIO.read().from(file))
.apply("ExtractRows", ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
for (String line : c.element().split("\\\\x01\\\\n")) {
if (!line.isEmpty()) {
c.output(line);
}
}
}
}))
.apply("ToBQRow", ParDo.of(new DoFn<String, TableRow>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
TableRow row = new TableRow();
row.set("data", c.element());
c.output(row);
}
}))
.apply(BigQueryIO.writeTableRows().to(output)
.withSchema(schema)
.withMethod(Method.FILE_LOADS)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
p.run();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment