Skip to content

Instantly share code, notes, and snippets.

View e3oroush's full-sized avatar

Ebi Soroush e3oroush

View GitHub Profile
@e3oroush
e3oroush / TimeWindowedCEP.java
Last active July 13, 2019 17:25
time window aggregation CEP in flink
SingleOutputStreamOperator<Tuple3<Integer, Long, Integer>> aggregatedMatch = result.keyBy(new KeySelector<Sensor, Integer>() {
@Override
public Integer getKey(Sensor sensor) throws Exception {
return sensor.getDeviceId();
}
}).window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
.aggregate(new AggregateFunction<Sensor, Tuple3<Integer, Long, Integer>, Tuple3<Integer, Long, Integer>>() {
@Override
public Tuple3<Integer, Long, Integer> createAccumulator() {
Tuple3<Integer, Long, Integer> acc = new Tuple3<>();
@e3oroush
e3oroush / SimpleCEP.java
Created July 13, 2019 17:19
simple match expression
// define a simple pattern and condition to detecto from data stream
Pattern<Sensor,?> highTempPattern = Pattern.<Sensor>begin("first").where(new SimpleCondition<Sensor>() {
@Override
public boolean filter(Sensor sensor) throws Exception {
return sensor.getTemperature() > 60;
}
});
// get resulted data stream from input data stream based on the defined CEP pattern
DataStream<Sensor> result = CEP.pattern(inputStream.keyBy(new KeySelector<Sensor, Integer>() {
@Override
@e3oroush
e3oroush / StreamingJob.java
Last active July 13, 2019 17:16
Data stream Processing
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// set time characteristic to be event timestamp
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// assign fictious stream of data as our data source and extract timestamp field
DataStream<Sensor> inputStream = env.addSource(new TemperatureSensor())
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Sensor>() {
@Override
public long extractAscendingTimestamp(Sensor sensor) {
return sensor.getTimestamp();
@e3oroush
e3oroush / Sensor.java
Created July 13, 2019 17:08
Simple JAVA class for device and temperature
package mypackage.events;
public class Sensor {
private int deviceId;
private double temperature;
private long timestamp;
public Sensor(int deviceId, double temperature, long timestamp)
{
this.deviceId = deviceId;
@e3oroush
e3oroush / TemperatureSensor.java
Created July 13, 2019 17:06
Source of random data stream
package mypackage.sources;
import mypackage.events.Sensor;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import java.util.concurrent.ThreadLocalRandom;
public class TemperatureSensor extends RichSourceFunction<Sensor> {
private boolean running = true;
@Override
@e3oroush
e3oroush / getMediaInformation.sh
Created September 5, 2018 20:51
A simple, and handy tool for calculating video duration of one file or directory of multiple video files
#
# Author: Ebi: ebrhim.soroush@gmail.com
# You have to install mediainfo and libxml-xpath-perl and source this file or just copy this script into your ~/.bashrc
# sudo apt install mediainfo libxml-xpath-perl
#
read_dom () {
local IFS=\>
read -d \< ENTITY CONTENT
};

Keybase proof

I hereby claim:

  • I am e-soroush on github.
  • I am esoroush (https://keybase.io/esoroush) on keybase.
  • I have a public key ASBBUdHyOwc0_8Hc3Ogu6Tghwfl1GoxyaCyXvIP92H_UBgo

To claim this, I am signing this object: