Skip to content

Instantly share code, notes, and snippets.

@e3oroush
Created July 13, 2019 17:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save e3oroush/31233041bc99bc2dc412dd5c5ad51d78 to your computer and use it in GitHub Desktop.
Save e3oroush/31233041bc99bc2dc412dd5c5ad51d78 to your computer and use it in GitHub Desktop.
simple match expression
// define a simple pattern and condition to detecto from data stream
Pattern<Sensor,?> highTempPattern = Pattern.<Sensor>begin("first").where(new SimpleCondition<Sensor>() {
@Override
public boolean filter(Sensor sensor) throws Exception {
return sensor.getTemperature() > 60;
}
});
// get resulted data stream from input data stream based on the defined CEP pattern
DataStream<Sensor> result = CEP.pattern(inputStream.keyBy(new KeySelector<Sensor, Integer>() {
@Override
public Integer getKey(Sensor sensor) throws Exception {
return sensor.getDeviceId();
}
}), highTempPattern).process(new PatternProcessFunction<Sensor, Sensor>() {
@Override
public void processMatch(Map<String, List<Sensor>> map, Context context, Collector<Sensor> collector) throws Exception {
collector.collect(map.get("first").get(0));
}
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment