I hereby claim:
- I am e-soroush on github.
- I am esoroush (https://keybase.io/esoroush) on keybase.
- I have a public key ASBBUdHyOwc0_8Hc3Ogu6Tghwfl1GoxyaCyXvIP92H_UBgo
To claim this, I am signing this object:
# | |
# Author: Ebi: ebrhim.soroush@gmail.com | |
# You have to install mediainfo and libxml-xpath-perl and source this file or just copy this script into your ~/.bashrc | |
# sudo apt install mediainfo libxml-xpath-perl | |
# | |
read_dom () { | |
local IFS=\> | |
read -d \< ENTITY CONTENT | |
}; |
SingleOutputStreamOperator<Tuple3<Integer, Long, Integer>> aggregatedMatch = result.keyBy(new KeySelector<Sensor, Integer>() { | |
@Override | |
public Integer getKey(Sensor sensor) throws Exception { | |
return sensor.getDeviceId(); | |
} | |
}).window(TumblingProcessingTimeWindows.of(Time.minutes(1))) | |
.aggregate(new AggregateFunction<Sensor, Tuple3<Integer, Long, Integer>, Tuple3<Integer, Long, Integer>>() { | |
@Override | |
public Tuple3<Integer, Long, Integer> createAccumulator() { | |
Tuple3<Integer, Long, Integer> acc = new Tuple3<>(); |
// define a simple pattern and condition to detecto from data stream | |
Pattern<Sensor,?> highTempPattern = Pattern.<Sensor>begin("first").where(new SimpleCondition<Sensor>() { | |
@Override | |
public boolean filter(Sensor sensor) throws Exception { | |
return sensor.getTemperature() > 60; | |
} | |
}); | |
// get resulted data stream from input data stream based on the defined CEP pattern | |
DataStream<Sensor> result = CEP.pattern(inputStream.keyBy(new KeySelector<Sensor, Integer>() { | |
@Override |
// set up the streaming execution environment | |
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); | |
// set time characteristic to be event timestamp | |
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); | |
// assign fictious stream of data as our data source and extract timestamp field | |
DataStream<Sensor> inputStream = env.addSource(new TemperatureSensor()) | |
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Sensor>() { | |
@Override | |
public long extractAscendingTimestamp(Sensor sensor) { | |
return sensor.getTimestamp(); |
package mypackage.events; | |
public class Sensor { | |
private int deviceId; | |
private double temperature; | |
private long timestamp; | |
public Sensor(int deviceId, double temperature, long timestamp) | |
{ | |
this.deviceId = deviceId; |
package mypackage.sources; | |
import mypackage.events.Sensor; | |
import org.apache.flink.streaming.api.functions.source.RichSourceFunction; | |
import java.util.concurrent.ThreadLocalRandom; | |
public class TemperatureSensor extends RichSourceFunction<Sensor> { | |
private boolean running = true; | |
@Override |
I hereby claim:
To claim this, I am signing this object: