Skip to content

Instantly share code, notes, and snippets.

@izhar
Last active July 6, 2016 11:23
Show Gist options
  • Save izhar/ea78fb53518d9b2d7045cef7017596b8 to your computer and use it in GitHub Desktop.
Save izhar/ea78fb53518d9b2d7045cef7017596b8 to your computer and use it in GitHub Desktop.
como-datastore-read
import com.google.cloud.dataflow.sdk.io.Read;
...
static class FormatToCanonical extends DoFn<Entity, String> {
@Override
public void processElement(ProcessContext c) {
create a json payload which consists of
some data extracted from the DS Entity
and the original Entity data in a 'raw_payload' field
}
}
static class PopulatePubsubAttributes extends DoFn<String,String> {
@Override
public void processElement(ProcessContext c) {
Add pubsub attributes for next stages to use
}
}
public static void readFromDatastore(Options options) {
Query query = generateQuery(options);
LOG.info("Query: " + query.toString());
String outgoingTopicURI = constructTopicURI(options.getOutgoingTopic(),options.getProject(),options.getFlowVersion());
DatastoreIO.Source source = DatastoreIO.source()
.withNamespace(options.getNamespace())
.withDataset (options.getDataset())
.withQuery (query);
Pipeline p = Pipeline.create(options);
p
.apply("ReadFromDatastore" ,Read.from(source))
.apply("FormatToCanonical" ,ParDo.of(new FormatToCanonical()))
.apply("PopulatePubsubAttributes" ,ParDo.of(new PopulatePubsubAttributes()))
.apply("PublishToRawTopic" ,PubsubIO.Write.timestampLabel("created_on").topic(outgoingTopicURI));
p.run();
}
public static void main(String args[]) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
String stagingURI = constructStorageURI(options.getStagingLocation());
options.setStagingLocation(stagingURI);
String jobName = "ActivityExtractor";
jobName = jobName.concat(options.getFlowVersion());
options.setAppName(jobName);
ActivityExtractor.readFromDatastore(options);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment