Skip to content

Instantly share code, notes, and snippets.

@polleyg
Created September 6, 2018 12:14
Show Gist options
  • Save polleyg/a2f2e4d8567b459c11f420fa6fb027d6 to your computer and use it in GitHub Desktop.
Save polleyg/a2f2e4d8567b459c11f420fa6fb027d6 to your computer and use it in GitHub Desktop.
Templated Dataflow pipeline for reading Wikipedia page views from a file
package org.polleyg;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import java.util.ArrayList;
import java.util.List;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED;
import static org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition.WRITE_APPEND;
/**
* Do some randomness
*/
public class TemplatePipeline {
public static void main(String[] args) {
PipelineOptionsFactory.register(TemplateOptions.class);
TemplateOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(TemplateOptions.class);
Pipeline pipeline = Pipeline.create(options);
pipeline.apply("READ", TextIO.read().from(options.getInputFile()))
.apply("TRANSFORM", ParDo.of(new WikiParDo()))
.apply("WRITE", BigQueryIO.writeTableRows()
.to(String.format("%s:dotc_2018.wiki_demo", options.getProject()))
.withCreateDisposition(CREATE_IF_NEEDED)
.withWriteDisposition(WRITE_APPEND)
.withSchema(getTableSchema()));
pipeline.run();
}
private static TableSchema getTableSchema() {
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("year").setType("INTEGER"));
fields.add(new TableFieldSchema().setName("month").setType("INTEGER"));
fields.add(new TableFieldSchema().setName("day").setType("INTEGER"));
fields.add(new TableFieldSchema().setName("wikimedia_project").setType("STRING"));
fields.add(new TableFieldSchema().setName("language").setType("STRING"));
fields.add(new TableFieldSchema().setName("title").setType("STRING"));
fields.add(new TableFieldSchema().setName("views").setType("INTEGER"));
return new TableSchema().setFields(fields);
}
public interface TemplateOptions extends DataflowPipelineOptions {
@Description("GCS path of the file to read from")
ValueProvider<String> getInputFile();
void setInputFile(ValueProvider<String> value);
}
public static class WikiParDo extends DoFn<String, TableRow> {
public static final String HEADER = "year,month,day,wikimedia_project,language,title,views";
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
if (c.element().equalsIgnoreCase(HEADER)) return;
String[] split = c.element().split(",");
if (split.length > 7) return;
TableRow row = new TableRow();
for (int i = 0; i < split.length; i++) {
TableFieldSchema col = getTableSchema().getFields().get(i);
row.set(col.getName(), split[i]);
}
c.output(row);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment