Created
May 10, 2016 09:14
-
-
Save izhar/3b22cab626b9daf0837323ac7abb295d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class Raw2Bigquery { | |
... | |
... | |
public static void main(String[] args) { | |
DataflowPipelineOptions options = PipelineOptionsFactory.create() | |
.as(DataflowPipelineOptions.class); | |
... | |
Pipeline mPipeline = Pipeline.create(options); | |
... | |
// Apply the pipeline's transforms. | |
mPipeline | |
.apply(PubsubIO.Read.idLabel("kind_entity_id").topic(TOPIC).withCoder(TableRowJsonCoder.of()).named("ReadFromRawIn")) | |
.apply(ParDo.of(new JsonToRowConverter())) | |
.apply(BigQueryIO.Write | |
.named("WriteRawToBQ") | |
.to(table_name) | |
.withSchema(JsonToRowConverter.getSchema()) | |
.withWriteDisposition(WriteDisposition.WRITE_APPEND) | |
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)); | |
mPipeline.run(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class ActivityExtractor { | |
public static interface Options extends PipelineOptions { | |
GET SET OPTIONS HERE | |
} | |
static class PopulateUniqueKey extends DoFn<String,String> { | |
@Override | |
public void processElement(ProcessContext c) { | |
// Extract 'kind' & 'entity_id' from the payload, concat, | |
// and store as a value of "kind_entity_id" in the "attributes" object | |
JSONParser parser = new JSONParser(); | |
JSONObject incoming_payload = new JSONObject(); | |
JSONObject outgoing_payload = new JSONObject(); | |
Map<String,String> attributes = new HashMap<String , String>(); | |
try{ | |
incoming_payload = (JSONObject)parser.parse(c.element()); | |
// The incoming payload at this point (as seen ion the dataflow logs): | |
// { | |
// "raw_payload": "the entire DS data as read from previous transform", | |
// "created_on": 1462868124000000, | |
// "kind": "UserAction", | |
// "action": "tag_operation", | |
// "entity_id": 5518382553366528, | |
// "location_id": "4664", | |
// "attributes": { | |
// "kind_entity_id": "UserAction-5518382553366528" | |
// } | |
// } | |
String idLabel = (String)(incoming_payload.get("kind")); | |
idLabel = idLabel.concat("-"); | |
idLabel = idLabel.concat(String.valueOf(incoming_payload.get("entity_id"))); | |
idLabel = idLabel.replaceAll("\\s",""); | |
attributes.put("kind_entity_id",idLabel); | |
outgoing_payload.put("attributes",attributes); | |
outgoing_payload.put("data",incoming_payload); | |
}catch(org.json.simple.parser.ParseException pe){ | |
LOG.error(pe.toString()); | |
} | |
// The outgoing payload at this point (as seen ion the dataflow logs): | |
// { | |
// "data": { | |
// "raw_payload": "", | |
// "created_on": 1462868124000000, | |
// "kind": "UserAction", | |
// "action": "tag_operation", | |
// "entity_id": 5369656291622912, | |
// "location_id": "4664", | |
// "attributes": { | |
// "kind_entity_id": "UserAction-5369656291622912" | |
// } | |
// }, | |
// "attributes": { | |
// "kind_entity_id": "UserAction-5369656291622912" | |
// } | |
// NOTE: I added 'attributes' to the 'data' object as well | |
// 'just in case..', but it obviously did not help | |
c.output(outgoing_payload.toString()); | |
} | |
} | |
static class FormatToCanonical extends DoFn<Entity, String> { | |
@Override | |
public void processElement(ProcessContext c) { | |
// Extract some fields from the raw Datastore payload | |
// and re-packages in a json format | |
// Exrtract values from datastore here | |
/// ... | |
// re-package in json | |
JSONObject jo = new JSONObject(); | |
jo.put("entity_id" ,entity_id); | |
jo.put("created_on" ,created_on); | |
jo.put("kind" ,kind); | |
jo.put("action" ,action); | |
jo.put("location_id",location_id); | |
jo.put("raw_payload",c.element().toString()); | |
// send re-packaged payload | |
c.output(jo.toString()); | |
} | |
} | |
public static void readFromDatastoreAndPublish(Options options) { | |
Query query = makeAncestorKindQuery(options); | |
DatastoreIO.Source source = DatastoreIO.source() | |
.withNamespace(options.getNamespace()) | |
.withDataset(options.getDataset()) | |
.withQuery(query); | |
Pipeline p = Pipeline.create(options); | |
p | |
.apply("ReadFromDatastore",Read.from(source)) | |
// Extract some fields from the Datastore object, and wrap in json | |
.apply("FormatToCanonical",ParDo.of(new FormatToCanonical())) | |
// Add the unique event identifier into the 'attributes' object | |
.apply("PopulateUniqueKey" ,ParDo.of(new PopulateUniqueKey())) | |
// Publish | |
.apply(PubsubIO.Write.idLabel("kind_entity_id").named("PublishToRawLoader").topic(options.getTopic())); | |
// Tried without setting idLabel in the publisher as well | |
//.apply(PubsubIO.Write.named("PublishToRawLoader").topic(options.getTopic())); | |
p.run(); | |
} | |
public static void main(String args[]) { | |
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); | |
ActivityExtractor.readFromDatastoreAndPublish(options); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment