Last active
July 13, 2019 17:25
-
-
Save e3oroush/57b1c3b12904bc8bae7de2462f469569 to your computer and use it in GitHub Desktop.
time window aggregation CEP in flink
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
SingleOutputStreamOperator<Tuple3<Integer, Long, Integer>> aggregatedMatch = result.keyBy(new KeySelector<Sensor, Integer>() { | |
@Override | |
public Integer getKey(Sensor sensor) throws Exception { | |
return sensor.getDeviceId(); | |
} | |
}).window(TumblingProcessingTimeWindows.of(Time.minutes(1))) | |
.aggregate(new AggregateFunction<Sensor, Tuple3<Integer, Long, Integer>, Tuple3<Integer, Long, Integer>>() { | |
@Override | |
public Tuple3<Integer, Long, Integer> createAccumulator() { | |
Tuple3<Integer, Long, Integer> acc = new Tuple3<>(); | |
acc.f0 = -1; | |
return acc; | |
} | |
@Override | |
public Tuple3<Integer, Long, Integer> add(Sensor sensor, Tuple3<Integer, Long, Integer> integerLongtuple2) { | |
if(integerLongtuple2.f0 == -1){ | |
integerLongtuple2.f0 = sensor.getDeviceId(); | |
integerLongtuple2.f1 = sensor.getTimestamp(); | |
integerLongtuple2.f2 = 0; | |
} | |
integerLongtuple2.f2++; | |
return integerLongtuple2; | |
} | |
@Override | |
public Tuple3<Integer, Long, Integer> getResult(Tuple3<Integer, Long, Integer> integerLongtuple2) { | |
return integerLongtuple2; | |
} | |
@Override | |
public Tuple3<Integer, Long, Integer> merge(Tuple3<Integer, Long, Integer> integerLongtuple2, Tuple3<Integer, Long, Integer> acc1) { | |
acc1.f2 += integerLongtuple2.f2; | |
return acc1; | |
} | |
}); | |
DataStream<Tuple2<Integer,Long>> aggregatedResult = CEP.pattern(aggregatedMatch, | |
Pattern.<Tuple3<Integer,Long, Integer>>begin("aggs") | |
.where(new SimpleCondition<Tuple3<Integer, Long, Integer>>() { | |
@Override | |
public boolean filter(Tuple3<Integer, Long, Integer> integerLongTuple3) throws Exception { | |
return integerLongTuple3.f2 > 4; | |
} | |
})).process(new PatternProcessFunction<Tuple3<Integer, Long, Integer>, Tuple2<Integer, Long>>() { | |
@Override | |
public void processMatch(Map<String, List<Tuple3<Integer, Long, Integer>>> map, Context context, Collector<Tuple2<Integer, Long>> collector) throws Exception { | |
Tuple2<Integer, Long> result = new Tuple2<>(); | |
result.f0 = map.get("aggs").get(0).f0; | |
result.f1 = map.get("aggs").get(0).f1; | |
collector.collect(result); | |
} | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment