Skip to content

Instantly share code, notes, and snippets.

@revolutionisme
Created June 16, 2017 13:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save revolutionisme/cf675ceee1492b93be020d4526bc9d38 to your computer and use it in GitHub Desktop.
Save revolutionisme/cf675ceee1492b93be020d4526bc9d38 to your computer and use it in GitHub Desktop.
Not working with event time properly, working with processing time (last event missing unless a subsequent event is thrown)
package com.airplus.poc;
import com.airplus.poc.helper.CEPRule;
import com.airplus.poc.helper.Configure;
import com.airplus.poc.helper.StringSerializerToEvent;
import com.airplus.poc.model.BAMEvent;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.PatternTimeoutFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
import org.apache.flink.types.Either;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
import java.util.*;
/**
* @author Biplob Biswas on 23.05.2017.
*/
public class CEPForBAM {
public static void main(String[] args) throws Exception {
Logger log = LoggerFactory.getLogger(CEPForBAM.class);
log.info("Start flink job");
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//env.getConfig().setAutoWatermarkInterval(1000);
// configure Kafka consumer
Properties props = new Properties();
props = Configure.getDefaultKafkaReadingProperties(props);
Properties rules = new Properties();
try (final InputStream stream = CEPForBAM.class.getResourceAsStream("/rule.file")) {
rules.load(stream);
} catch (IOException ex) {
log.error("Error-Loading rule.file", ex);
}
log.info("Setting up Flink Kafka Consumer");
FlinkKafkaConsumer010<BAMEvent> kafkaSource = new FlinkKafkaConsumer010<>(
Arrays.asList("iom", "edl", "tps"),
new StringSerializerToEvent(),
props);
DataStream<BAMEvent> events = env.addSource(kafkaSource)
.filter(Objects::nonNull)
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<BAMEvent>(Time.seconds(30)) {
private static final long serialVersionUID = -7228487240278428374L;
@Override
public long extractTimestamp(BAMEvent event) {
return event.getTimestamp();
}
});
Pattern<BAMEvent, ?> eventPattern =
CEPRule.getPatternForRule(
rules.getProperty("rule1.event1"),
rules.getProperty("rule1.event2"),
rules.getProperty("rule1.time")
);
DataStream<BAMEvent> partitionedInput = events
.keyBy((KeySelector<BAMEvent, String>) BAMEvent::getId);
partitionedInput.print();
PatternStream<BAMEvent> patternStream = CEP.pattern(partitionedInput, eventPattern);
DataStream<Either<String, String>> alerts = patternStream.select(new PatternTimeoutFunction<BAMEvent, String>() {
private static final long serialVersionUID = -8717561187522704500L;
@Override
public String timeout(Map<String, List<BAMEvent>> map, long l) throws Exception {
return "Anomaly Events: " + map.toString() + " @ " + l;
}
}, new PatternSelectFunction<BAMEvent, String>() {
private static final long serialVersionUID = 3144439966791408980L;
@Override
public String select(Map<String, List<BAMEvent>> pattern) throws Exception {
//BAMEvent bamEvent = pattern.get("first").get(0);
return "Matched Events: " + pattern.toString();
}
});
//Todo: Simply returning alert events as string to KafkaTopic (needs to be changed appropriately if needed)
FlinkKafkaProducer010<String> kafkaSink = Configure.getKafkaSink();
alerts.map(alert -> {
if (alert.isLeft()) {
return alert.left();
}
return null;
}).filter(Objects::nonNull).print();
alerts.map(alert -> {
// TODO: Change to left
/* Right is the one where events are correlated, shown as we dont have anomalies right now
For the final code, the left alert(Anomalies) should be sent to the kafka sink.
*/
if (alert.isRight()) {
return alert.right();
}
return alert.left();
})
.filter(Objects::nonNull)
.addSink(kafkaSink);
env.execute("BAM Processing");
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment