Skip to content

Instantly share code, notes, and snippets.

@izhar
Created May 10, 2016 09:14
Show Gist options
  • Save izhar/3b22cab626b9daf0837323ac7abb295d to your computer and use it in GitHub Desktop.
Save izhar/3b22cab626b9daf0837323ac7abb295d to your computer and use it in GitHub Desktop.
public class Raw2Bigquery {
...
...
public static void main(String[] args) {
DataflowPipelineOptions options = PipelineOptionsFactory.create()
.as(DataflowPipelineOptions.class);
...
Pipeline mPipeline = Pipeline.create(options);
...
// Apply the pipeline's transforms.
mPipeline
.apply(PubsubIO.Read.idLabel("kind_entity_id").topic(TOPIC).withCoder(TableRowJsonCoder.of()).named("ReadFromRawIn"))
.apply(ParDo.of(new JsonToRowConverter()))
.apply(BigQueryIO.Write
.named("WriteRawToBQ")
.to(table_name)
.withSchema(JsonToRowConverter.getSchema())
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED));
mPipeline.run();
}
}
public class ActivityExtractor {
public static interface Options extends PipelineOptions {
GET SET OPTIONS HERE
}
static class PopulateUniqueKey extends DoFn<String,String> {
@Override
public void processElement(ProcessContext c) {
// Extract 'kind' & 'entity_id' from the payload, concat,
// and store as a value of "kind_entity_id" in the "attributes" object
JSONParser parser = new JSONParser();
JSONObject incoming_payload = new JSONObject();
JSONObject outgoing_payload = new JSONObject();
Map<String,String> attributes = new HashMap<String , String>();
try{
incoming_payload = (JSONObject)parser.parse(c.element());
// The incoming payload at this point (as seen ion the dataflow logs):
// {
// "raw_payload": "the entire DS data as read from previous transform",
// "created_on": 1462868124000000,
// "kind": "UserAction",
// "action": "tag_operation",
// "entity_id": 5518382553366528,
// "location_id": "4664",
// "attributes": {
// "kind_entity_id": "UserAction-5518382553366528"
// }
// }
String idLabel = (String)(incoming_payload.get("kind"));
idLabel = idLabel.concat("-");
idLabel = idLabel.concat(String.valueOf(incoming_payload.get("entity_id")));
idLabel = idLabel.replaceAll("\\s","");
attributes.put("kind_entity_id",idLabel);
outgoing_payload.put("attributes",attributes);
outgoing_payload.put("data",incoming_payload);
}catch(org.json.simple.parser.ParseException pe){
LOG.error(pe.toString());
}
// The outgoing payload at this point (as seen ion the dataflow logs):
// {
// "data": {
// "raw_payload": "",
// "created_on": 1462868124000000,
// "kind": "UserAction",
// "action": "tag_operation",
// "entity_id": 5369656291622912,
// "location_id": "4664",
// "attributes": {
// "kind_entity_id": "UserAction-5369656291622912"
// }
// },
// "attributes": {
// "kind_entity_id": "UserAction-5369656291622912"
// }
// NOTE: I added 'attributes' to the 'data' object as well
// 'just in case..', but it obviously did not help
c.output(outgoing_payload.toString());
}
}
static class FormatToCanonical extends DoFn<Entity, String> {
@Override
public void processElement(ProcessContext c) {
// Extract some fields from the raw Datastore payload
// and re-packages in a json format
// Exrtract values from datastore here
/// ...
// re-package in json
JSONObject jo = new JSONObject();
jo.put("entity_id" ,entity_id);
jo.put("created_on" ,created_on);
jo.put("kind" ,kind);
jo.put("action" ,action);
jo.put("location_id",location_id);
jo.put("raw_payload",c.element().toString());
// send re-packaged payload
c.output(jo.toString());
}
}
public static void readFromDatastoreAndPublish(Options options) {
Query query = makeAncestorKindQuery(options);
DatastoreIO.Source source = DatastoreIO.source()
.withNamespace(options.getNamespace())
.withDataset(options.getDataset())
.withQuery(query);
Pipeline p = Pipeline.create(options);
p
.apply("ReadFromDatastore",Read.from(source))
// Extract some fields from the Datastore object, and wrap in json
.apply("FormatToCanonical",ParDo.of(new FormatToCanonical()))
// Add the unique event identifier into the 'attributes' object
.apply("PopulateUniqueKey" ,ParDo.of(new PopulateUniqueKey()))
// Publish
.apply(PubsubIO.Write.idLabel("kind_entity_id").named("PublishToRawLoader").topic(options.getTopic()));
// Tried without setting idLabel in the publisher as well
//.apply(PubsubIO.Write.named("PublishToRawLoader").topic(options.getTopic()));
p.run();
}
public static void main(String args[]) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
ActivityExtractor.readFromDatastoreAndPublish(options);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment