Skip to content

Instantly share code, notes, and snippets.

@mbbhalla
Created September 19, 2019 16:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mbbhalla/02cad9dda62e2dee4808bcc3d5670a18 to your computer and use it in GitHub Desktop.
Save mbbhalla/02cad9dda62e2dee4808bcc3d5670a18 to your computer and use it in GitHub Desktop.
final ObjectMapper mapper = new ObjectMapper();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
Properties props = new Properties();
props.setProperty(TwitterSource.CONSUMER_KEY, "******************");
props.setProperty(TwitterSource.CONSUMER_SECRET, "*********************");
props.setProperty(TwitterSource.TOKEN, "*******************");
props.setProperty(TwitterSource.TOKEN_SECRET, "*****************");
env.addSource(new TwitterSource(props))
.filter(tweetJsonString -> tweetJsonString.contains("\"created_at\""))
.map(tweetJsonString -> {
JsonNode jsonNode = mapper.readTree(tweetJsonString);
return Tweet.builder()
.lang(jsonNode.get("lang").asText())
.timestamp(Instant.ofEpochMilli(jsonNode.get("timestamp_ms").asLong()))
.build();
})
.keyBy(tweetObject -> tweetObject.getLang())
.timeWindow(Time.minutes(1l))
.apply(new WindowFunction<Tweet, Tuple3<String, Long, Date>, String, TimeWindow>() {
@Override
public void apply(String lang,
TimeWindow window,
Iterable<Tweet> input,
Collector<Tuple3<String, Long, Date>> out) throws Exception {
out.collect(Tuple3.of(
lang,
StreamSupport.stream(input.spliterator(), true).count(),
Date.from(Instant.ofEpochMilli(window.getEnd()))
));
}
})
.addSink(new SinkFunction<Tuple3<String, Long, Date>>() {
@SuppressWarnings("rawtypes")
public void invoke(Tuple3<String, Long, Date> value, Context context) throws Exception {
log.info("Blah: {}", value);
}
});
env.execute("Twitter keyBy Lang window count");
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment