Skip to content

Instantly share code, notes, and snippets.

@cobookman
Last active June 6, 2017 17:24
Show Gist options
  • Save cobookman/e4d2f2b89b4c3cadae9cd83892162758 to your computer and use it in GitHub Desktop.
Save cobookman/e4d2f2b89b4c3cadae9cd83892162758 to your computer and use it in GitHub Desktop.
group 'com.google.cloud.dataflow.teleport'
version '1.0-Alpha'
apply plugin: 'application'
apply plugin: 'java'
sourceCompatibility = 1.8
mainClassName = "com.google.cloud.dataflow.teleport.Main"
repositories {
mavenCentral()
}
dependencies {
compile group: 'com.google.cloud.dataflow', name: 'google-cloud-dataflow-java-sdk-all', version: '2.0.0'
compile group: 'org.apache.beam', name: 'beam-sdks-java-extensions-protobuf', version: '2.0.0'
// compile group: 'com.google.protobuf', name: 'protobuf-java', version: '3.1.0'
compile group: 'com.google.protobuf', name: 'protobuf-java-util', version: '3.1.0'
// compile group: 'com.google.code.gson', name: 'gson', version: '2.8.0'
compile group: 'org.slf4j', name: 'slf4j-api', version: '1.7.21'
compile group: 'org.slf4j', name: 'slf4j-jdk14', version: '1.7.21'
testCompile group: 'junit', name: 'junit', version: '4.12'
}
task resources {
def resourcesDir = new File('build/resources/main')
resourcesDir.mkdirs()
}
task uberjar(type: Jar) {
from files(sourceSets.main.output.classesDir)
from {configurations.compile.collect {zipTree(it)}} {
exclude "META-INF/*.SF"
exclude "META-INF/*.DSA"
exclude "META-INF/*.RSA"
}
manifest {
attributes 'Main-Class': mainClassName
}
}
package com.google.cloud.dataflow.teleport;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.datastore.DatastoreIO;
import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import com.google.protobuf.util.JsonFormat;
import com.google.protobuf.util.JsonFormat.TypeRegistry;
import com.google.datastore.v1.Entity;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.io.IOException;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
/**
* Exports Datastore Entities to GCS as newline deliminted Protobuf v3 Json.
*/
public class DatastoreToGcs {
/**
* Runs the DatastoreToGcs dataflow pipeline
* @param args
*/
public static void main(String[] args) throws IOException {
Options options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(Options.class);
// System.out.println("Pipeline runner of: " + options.getRunner().toString());
// Build DatastoreToGCS pipeline
Pipeline pipeline = Pipeline.create(options);
PCollection<Entity> entities = pipeline.apply("IngestEntities",
DatastoreIO.v1().read()
.withProjectId(options.getProjectId())
.withLiteralGqlQuery(options.getGqlQuery()));
PCollection<String> jsons = entities.apply("EntityToJson", ParDo.of(new EntityToJson()));
jsons.apply("JsonToGcs", TextIO.write().to(options.getGcsSavePath())
.withSuffix(".json"));
// Start the job
PipelineResult pipelineResult = pipeline.run();
if (options.getKeepJobsRunning()) {
System.out.println("Blocking until done");
try {
System.out.println(pipelineResult.waitUntilFinish());
} catch (Exception exc) {
System.err.println(exc);
pipelineResult.cancel();
}
}
}
interface Options extends PipelineOptions {
@Validation.Required
@Description("GCS Path E.g: gs://mybucket/somepath/")
ValueProvider<String> getGcsSavePath();
void setGcsSavePath(ValueProvider<String> gcsSavePath);
@Validation.Required
@Description("GQL Query to get the datastore Entities")
ValueProvider<String> getGqlQuery();
void setGqlQuery(ValueProvider<String> gqlQuery);
@Validation.Required
@Description("ProjectId")
ValueProvider<String> getProjectId();
void setProjectId(ValueProvider<String> projectId);
@Description("Block until job finishes")
@Default.Boolean(true)
Boolean getKeepJobsRunning();
void setKeepJobsRunning(Boolean keepJobsRunning);
}
static class EntityToJson extends DoFn<Entity, String> {
protected static JsonFormat.Printer mJsonPrinter = null;
public static JsonFormat.Printer getPrinter() {
if (mJsonPrinter == null) {
TypeRegistry typeRegistry = TypeRegistry.newBuilder()
.add(Entity.getDescriptor())
.build();
mJsonPrinter = JsonFormat.printer()
.usingTypeRegistry(typeRegistry)
.includingDefaultValueFields()
.omittingInsignificantWhitespace();
}
return mJsonPrinter;
}
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
Entity entity = c.element();
c.output(getPrinter().print(entity));
}
}
}
Saving template to: /Datastore..to.GCS
Pipeline Args
--runner=DataflowRunner
--project=strong-moose
--stagingLocation=gs://strong-moose.appspot.com/staging/
--tempLocation=gs://strong-moose.appspot.com/temp/
--templateLocation=gs://strong-moose.appspot.com/templates3/
Jun 06, 2017 10:16:53 AM org.apache.beam.runners.dataflow.DataflowRunner fromOptions
INFO: PipelineOptions.filesToStage was not specified. Defaulting to files from the classpath: will stage 1 files. Enable logging at DEBUG level to see which files will be staged.
Exception in thread "main" java.lang.IllegalStateException: Unable to return a default Coder for IngestEntities/ParDo(GqlQueryTranslate)/ParMultiDo(GqlQueryTranslate).out0 [PCollection]. Correct one of the following root causes:
No Coder has been manually specified; you may do so using .setCoder().
Inferring a Coder from the CoderRegistry failed: Unable to provide a Coder for com.google.datastore.v1.Query.
Building a Coder using a registered CoderProvider failed.
See suppressed exceptions for detailed failures.
Using the default output Coder from the producing PTransform failed: Unable to provide a Coder for com.google.datastore.v1.Query.
Building a Coder using a registered CoderProvider failed.
See suppressed exceptions for detailed failures.
at org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:444)
at org.apache.beam.sdk.values.PCollection.getCoder(PCollection.java:250)
at org.apache.beam.sdk.values.PCollection.finishSpecifying(PCollection.java:104)
at org.apache.beam.sdk.runners.TransformHierarchy.finishSpecifyingInput(TransformHierarchy.java:147)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:481)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:422)
at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:277)
at org.apache.beam.sdk.io.gcp.datastore.DatastoreV1$Read.expand(DatastoreV1.java:581)
at org.apache.beam.sdk.io.gcp.datastore.DatastoreV1$Read.expand(DatastoreV1.java:226)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:482)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:441)
at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:56)
at org.apache.beam.sdk.Pipeline.apply(Pipeline.java:179)
at com.google.cloud.dataflow.teleport.DatastoreToGcs.main(DatastoreToGcs.java:47)
at com.google.cloud.dataflow.teleport.Main.main(Main.java:50)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment