Skip to content

Instantly share code, notes, and snippets.

@spark2ignite
Last active Aug 14, 2017
Embed
What would you like to do?
How we saved over $240K per year by replacing Mixpanel with Google BigQuery, Pub/Sub, Dataflow & Kubernetes (code snippet #3)
package com.jellybtn;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.dataflow.sdk.Pipeline;
import com.google.cloud.dataflow.sdk.io.BigQueryIO;
import com.google.cloud.dataflow.sdk.io.PubsubIO;
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
import com.google.cloud.dataflow.sdk.transforms.Filter;
import com.google.cloud.dataflow.sdk.transforms.Flatten;
import com.google.cloud.dataflow.sdk.transforms.MapElements;
import com.google.cloud.dataflow.sdk.transforms.ParDo;
import com.google.cloud.dataflow.sdk.values.*;
import org.json.simple.parser.ParseException;
import java.io.IOException;
public class JellybtnIngestFlow {
private static final TupleTag < TableRow > outputTag = new TupleTag < TableRow > () {};
private static final TupleTag < TableRow > errorsTag = new TupleTag < TableRow > () {};
private static PCollection < TableRow > handleEvents(Pipeline pipeline, JellybtnIngestFlowOptions options) throws IOException, ParseException {
PCollection < String > rawEvents = pipeline.apply("Read Events PubSub Messages",
PubsubIO.Read
.subscription(options.getEventsSubscriptionPath())
);
PCollectionTuple mappedEvents = rawEvents.apply("Map Events", ParDo
.withOutputTags(outputTag, TupleTagList.of(errorsTag))
.of(new MapEvents(errorsTag)));
PCollection < TableRow > events = mappedEvents.get(outputTag);
writeToTable(events,
TableSchemas.people(),
new TableReference().setProjectId(options.getBQProjectID()).setTableId(options.getEventsTableName()),
options.getDatasets(), options.getDatasetSuffix());
return mappedEvents.get(errorsTag);
}
private static PCollection < TableRow > handlePeople(Pipeline pipeline, JellybtnIngestFlowOptions options) throws IOException, ParseException {
PCollection < String > rawPeople = pipeline.apply("Read People PubSub Messages",
PubsubIO.Read
.subscription(options.getPeopleSubscriptionPath())
);
PCollectionTuple mappedPeople = rawPeople.apply("Map people", ParDo
.withOutputTags(outputTag, TupleTagList.of(errorsTag))
.of(new MapPeople(errorsTag)));
PCollection < TableRow > people = mappedPeople.get(outputTag);
writeToTable(people,
TableSchemas.people(),
new TableReference().setProjectId(options.getBQProjectID()).setTableId(options.getPeopleTableName()),
options.getDatasets(), options.getDatasetSuffix());
return mappedPeople.get(errorsTag);
}
private static void writeToTable(PCollection < TableRow > rows, TableSchema tableSchema, TableReference tableRef, String[] datasets, String dataSetSuffix) throws IOException {
for (String dataset: datasets) {
tableRef.setDatasetId(dataset + dataSetSuffix);
PCollection < TableRow > datasetRows = rows.apply("Filter for " + tableRef.toPrettyString() + " table", Filter.byPredicate((TableRow row) - > dataset.equals(row.get("dataset"))));
datasetRows = datasetRows.apply("Remap for " + tableRef.toPrettyString() + " table", MapElements.via((TableRow row) - > {
row.remove("dataset");
return row;
}).withOutputType(TypeDescriptor.of(TableRow.class)));
datasetRows.apply("Write BigQuery " + tableRef.toPrettyString() + " table",
BigQueryIO.Write
.to(tableRef)
.withSchema(tableSchema)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
);
}
}
public static void main(String[] args) throws IOException, ParseException {
PipelineOptionsFactory.register(JellybtnIngestFlowOptions.class);
JellybtnIngestFlowOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(JellybtnIngestFlowOptions.class);
options.setStreaming(true);
Pipeline pipeline = Pipeline.create(options);
PCollection < TableRow > errorEvents = handleEvents(pipeline, options);
PCollection < TableRow > errorPeople = handlePeople(pipeline, options);
PCollectionList < TableRow > joinedErrors = PCollectionList.of(errorEvents).and(errorPeople);
PCollection < TableRow > errors = joinedErrors.apply("Join errors", Flatten.pCollections());
writeToTable(errors,
TableSchemas.errors(),
new TableReference().setProjectId(options.getBQProjectID()).setTableId(options.getErrorsTableName()),
options.getDatasets(), options.getDatasetSuffix());
pipeline.run();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment