Skip to content

Instantly share code, notes, and snippets.

@sAbakumoff
Created September 26, 2016 14:11
Show Gist options
  • Save sAbakumoff/1c01da26ae2c0fcbf4696cc154de9477 to your computer and use it in GitHub Desktop.
Save sAbakumoff/1c01da26ae2c0fcbf4696cc154de9477 to your computer and use it in GitHub Desktop.
pipeline.apply(PubsubIO.Read.timestampLabel("created_at").topic("projects/in-full-gear/topics/debate_tweets"))
.apply(ParDo.of(new DoFn<String, TableRow>(){
@Override
public void processElement(ProcessContext c) {
TableRow row = new TableRow();
try{
JSONParser parser = new JSONParser();
Object obj = parser.parse(c.element());
JSONObject jsonObject = (JSONObject) obj;
String text = (String) jsonObject.get("text");
Analyze analyze = new Analyze(Analyze.getLanguageService());
Sentiment sentiment = analyze.analyzeSentiment(text);
List<Token> tokens = analyze.analyzeSyntax(text);
JSONArray jsonTokens = new JSONArray();
for (Token token : tokens){
JSONObject jsonToken = new JSONObject();
jsonToken.put("partOfSpeech", token.getPartOfSpeech().getTag());
jsonToken.put("content", token.getText().getContent());
jsonTokens.add(jsonToken);
}
row.set("tweet_object", c.element())
.set("syntax", jsonTokens.toJSONString())
.set("polarity", sentiment.getPolarity())
.set("magnitude", sentiment.getMagnitude());
}
catch (ParseException | IOException | GeneralSecurityException e) {
row.set("tweet_object", e.toString());
}
c.output(row);
}
}))
.apply(BigQueryIO.Write.to(getTableReference()).withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED).
withWriteDisposition(WriteDisposition.WRITE_APPEND).withSchema(getSchema()));
pipeline.run();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment