Skip to content

Instantly share code, notes, and snippets.

@e3oroush
Last active July 13, 2019 17:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save e3oroush/57b1c3b12904bc8bae7de2462f469569 to your computer and use it in GitHub Desktop.
Save e3oroush/57b1c3b12904bc8bae7de2462f469569 to your computer and use it in GitHub Desktop.
time window aggregation CEP in flink
SingleOutputStreamOperator<Tuple3<Integer, Long, Integer>> aggregatedMatch = result.keyBy(new KeySelector<Sensor, Integer>() {
@Override
public Integer getKey(Sensor sensor) throws Exception {
return sensor.getDeviceId();
}
}).window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
.aggregate(new AggregateFunction<Sensor, Tuple3<Integer, Long, Integer>, Tuple3<Integer, Long, Integer>>() {
@Override
public Tuple3<Integer, Long, Integer> createAccumulator() {
Tuple3<Integer, Long, Integer> acc = new Tuple3<>();
acc.f0 = -1;
return acc;
}
@Override
public Tuple3<Integer, Long, Integer> add(Sensor sensor, Tuple3<Integer, Long, Integer> integerLongtuple2) {
if(integerLongtuple2.f0 == -1){
integerLongtuple2.f0 = sensor.getDeviceId();
integerLongtuple2.f1 = sensor.getTimestamp();
integerLongtuple2.f2 = 0;
}
integerLongtuple2.f2++;
return integerLongtuple2;
}
@Override
public Tuple3<Integer, Long, Integer> getResult(Tuple3<Integer, Long, Integer> integerLongtuple2) {
return integerLongtuple2;
}
@Override
public Tuple3<Integer, Long, Integer> merge(Tuple3<Integer, Long, Integer> integerLongtuple2, Tuple3<Integer, Long, Integer> acc1) {
acc1.f2 += integerLongtuple2.f2;
return acc1;
}
});
DataStream<Tuple2<Integer,Long>> aggregatedResult = CEP.pattern(aggregatedMatch,
Pattern.<Tuple3<Integer,Long, Integer>>begin("aggs")
.where(new SimpleCondition<Tuple3<Integer, Long, Integer>>() {
@Override
public boolean filter(Tuple3<Integer, Long, Integer> integerLongTuple3) throws Exception {
return integerLongTuple3.f2 > 4;
}
})).process(new PatternProcessFunction<Tuple3<Integer, Long, Integer>, Tuple2<Integer, Long>>() {
@Override
public void processMatch(Map<String, List<Tuple3<Integer, Long, Integer>>> map, Context context, Collector<Tuple2<Integer, Long>> collector) throws Exception {
Tuple2<Integer, Long> result = new Tuple2<>();
result.f0 = map.get("aggs").get(0).f0;
result.f1 = map.get("aggs").get(0).f1;
collector.collect(result);
}
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment