Skip to content

Instantly share code, notes, and snippets.

@revolutionisme
Created June 6, 2017 13:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save revolutionisme/c3878a6420b322176ac686cbf1a8ac43 to your computer and use it in GitHub Desktop.
Save revolutionisme/c3878a6420b322176ac686cbf1a8ac43 to your computer and use it in GitHub Desktop.
For reproducing no alert generation in FlinkCEP
import ch.qos.logback.classic.Level;
import ch.qos.logback.classic.Logger;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.PatternTimeoutFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.slf4j.LoggerFactory;
import java.util.*;
/**
* @author Biplob Biswas on 06.06.2017.
*/
public class CEPKafkaTest{
private static final String ZKEEPER = "localhost:2181";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
public static void main(String[] args) throws Exception {
Logger LOG = (Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME);
LOG.setLevel(Level.INFO);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
System.out.println(env.getStreamTimeCharacteristic());
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//env.getConfig().setAutoWatermarkInterval(10000);
// configure Kafka consumer
Properties props = new Properties();
props = getDefaultProperties(props);
FlinkKafkaConsumer010<String> kafkaSource = new FlinkKafkaConsumer010<>(
Arrays.asList("test"),
new SimpleStringSchema(),
props);
DataStream<Event> input = env.addSource(kafkaSource)
.map(str -> {
String[] strArr = str.split(",");
return new Event(Integer.parseInt(strArr[0]),strArr[1],Integer.parseInt(strArr[2]),Integer.parseInt(strArr[3]));
})
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Event>(Time.seconds(60)) {
private static final long serialVersionUID = -7228487240278428374L;
@Override
public long extractTimestamp(Event event) {
return event.getTimestamp();
}
});
input.print();
Pattern<Event, ?> pattern = Pattern.<Event>begin("id").where(new SimpleCondition<Event>() {
private static final long serialVersionUID = 7219646616484327688L;
@Override
public boolean filter(Event myEvent) throws Exception {
return Objects.equals(myEvent.getPayload(), "a");
}
}).followedBy("payload").where(new IterativeCondition<Event>() {
private static final long serialVersionUID = -9216505110246259082L;
@Override
public boolean filter(Event value, Context<Event> ctx) throws Exception {
for (Event event : ctx.getEventsForPattern("id")) {
if(event.getId() == value.getValue())
return true;
}
return false;
}
}).within(Time.seconds(1));
/* DataStream<Event> input = env.fromCollection(inputElements).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Event>() {
private static final long serialVersionUID = -6619787346214245526L;
@Override
public long extractAscendingTimestamp(Event myEvent) {
return myEvent.getTimestamp();
}
});*/
PatternStream<Event> patternStream = CEP.pattern(input.keyBy(new KeySelector<Event, Long>() {
private static final long serialVersionUID = 6928745840509494198L;
@Override
public Long getKey(Event myEvent) throws Exception {
return 1L;
}
}), pattern);
patternStream.select(new PatternTimeoutFunction<Event, String>() {
@Override
public String timeout(Map<String, List<Event>> map, long l) throws Exception {
return map.toString() +" @ "+ l;
}
private static final long serialVersionUID = 300759199619789416L;
}, new PatternSelectFunction<Event, String>() {
@Override
public String select(Map<String, List<Event>> map) throws Exception {
return map.toString() + "_" + Event.class.getSimpleName().equals(map.get("id").get(0).getClass().getSimpleName());
}
private static final long serialVersionUID = 732172159423132724L;
}).print();
env.execute("Bug Reproducing Job");
}
private static Properties getDefaultProperties(Properties prop) {
prop.put("group.id", "FlinkCEP");
prop.put("bootstrap.servers", BOOTSTRAP_SERVERS);
prop.put("zookeeper.connect", ZKEEPER);
prop.put("auto.offset.reset", "latest");
return prop;
}
}
class Event {
private int id;
private String payload;
private int timeStamp;
private int value;
Event(int id, String payload, int timeStamp, int value) {
this.id = id;
this.payload = payload;
this.timeStamp = timeStamp;
this.value = value;
}
String getPayload() {
return payload;
}
Long getId() {
return Long.parseLong(String.valueOf(id));
}
long getTimestamp() {
return (long) timeStamp;
}
int getValue() {
return value;
}
@Override
public String toString() {
return id + "_" + payload + "_" + timeStamp + "_" + value;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment