Skip to content

Instantly share code, notes, and snippets.

@liorksh
Created April 11, 2020 07:09
Embed
What would you like to do?
// Convert each record to a Tuple with name and score
DataStream<Tuple2<String, Integer>> userCounts
= inputDataObjectStream
.map(new MapFunction<InputData,Tuple2<String,Integer>>() {
@Override
public Tuple2<String,Integer> map(InputData item) {
return new Tuple2<String,Integer>(item.getName() ,item.getScore() );
}
})
.returns(Types.TUPLE(Types.STRING, Types.INT))
.keyBy(0) // returns KeyedStream<T, Tuple> based on the first item ('name' fields)
//.timeWindowAll(Time.seconds(windowInterval)) // DO NOT use timeWindowAll for a key-based stream
.timeWindow(Time.seconds(2)) // return WindowedStream<T, KEY, TimeWindow>
.reduce((x,y) -> new Tuple2<String,Integer>( x.f0+"-"+y.f0, x.f1+y.f1));
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment