Created
June 16, 2017 13:42
-
-
Save revolutionisme/cf675ceee1492b93be020d4526bc9d38 to your computer and use it in GitHub Desktop.
Not working with event time properly, working with processing time (last event missing unless a subsequent event is thrown)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.airplus.poc; | |
import com.airplus.poc.helper.CEPRule; | |
import com.airplus.poc.helper.Configure; | |
import com.airplus.poc.helper.StringSerializerToEvent; | |
import com.airplus.poc.model.BAMEvent; | |
import org.apache.flink.api.common.ExecutionConfig; | |
import org.apache.flink.api.java.functions.KeySelector; | |
import org.apache.flink.cep.CEP; | |
import org.apache.flink.cep.PatternSelectFunction; | |
import org.apache.flink.cep.PatternStream; | |
import org.apache.flink.cep.PatternTimeoutFunction; | |
import org.apache.flink.cep.pattern.Pattern; | |
import org.apache.flink.streaming.api.TimeCharacteristic; | |
import org.apache.flink.streaming.api.datastream.DataStream; | |
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; | |
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; | |
import org.apache.flink.streaming.api.watermark.Watermark; | |
import org.apache.flink.streaming.api.windowing.time.Time; | |
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; | |
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010; | |
import org.apache.flink.types.Either; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import java.io.IOException; | |
import java.io.InputStream; | |
import java.util.*; | |
/** | |
* @author Biplob Biswas on 23.05.2017. | |
*/ | |
public class CEPForBAM { | |
public static void main(String[] args) throws Exception { | |
Logger log = LoggerFactory.getLogger(CEPForBAM.class); | |
log.info("Start flink job"); | |
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); | |
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); | |
//env.getConfig().setAutoWatermarkInterval(1000); | |
// configure Kafka consumer | |
Properties props = new Properties(); | |
props = Configure.getDefaultKafkaReadingProperties(props); | |
Properties rules = new Properties(); | |
try (final InputStream stream = CEPForBAM.class.getResourceAsStream("/rule.file")) { | |
rules.load(stream); | |
} catch (IOException ex) { | |
log.error("Error-Loading rule.file", ex); | |
} | |
log.info("Setting up Flink Kafka Consumer"); | |
FlinkKafkaConsumer010<BAMEvent> kafkaSource = new FlinkKafkaConsumer010<>( | |
Arrays.asList("iom", "edl", "tps"), | |
new StringSerializerToEvent(), | |
props); | |
DataStream<BAMEvent> events = env.addSource(kafkaSource) | |
.filter(Objects::nonNull) | |
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<BAMEvent>(Time.seconds(30)) { | |
private static final long serialVersionUID = -7228487240278428374L; | |
@Override | |
public long extractTimestamp(BAMEvent event) { | |
return event.getTimestamp(); | |
} | |
}); | |
Pattern<BAMEvent, ?> eventPattern = | |
CEPRule.getPatternForRule( | |
rules.getProperty("rule1.event1"), | |
rules.getProperty("rule1.event2"), | |
rules.getProperty("rule1.time") | |
); | |
DataStream<BAMEvent> partitionedInput = events | |
.keyBy((KeySelector<BAMEvent, String>) BAMEvent::getId); | |
partitionedInput.print(); | |
PatternStream<BAMEvent> patternStream = CEP.pattern(partitionedInput, eventPattern); | |
DataStream<Either<String, String>> alerts = patternStream.select(new PatternTimeoutFunction<BAMEvent, String>() { | |
private static final long serialVersionUID = -8717561187522704500L; | |
@Override | |
public String timeout(Map<String, List<BAMEvent>> map, long l) throws Exception { | |
return "Anomaly Events: " + map.toString() + " @ " + l; | |
} | |
}, new PatternSelectFunction<BAMEvent, String>() { | |
private static final long serialVersionUID = 3144439966791408980L; | |
@Override | |
public String select(Map<String, List<BAMEvent>> pattern) throws Exception { | |
//BAMEvent bamEvent = pattern.get("first").get(0); | |
return "Matched Events: " + pattern.toString(); | |
} | |
}); | |
//Todo: Simply returning alert events as string to KafkaTopic (needs to be changed appropriately if needed) | |
FlinkKafkaProducer010<String> kafkaSink = Configure.getKafkaSink(); | |
alerts.map(alert -> { | |
if (alert.isLeft()) { | |
return alert.left(); | |
} | |
return null; | |
}).filter(Objects::nonNull).print(); | |
alerts.map(alert -> { | |
// TODO: Change to left | |
/* Right is the one where events are correlated, shown as we dont have anomalies right now | |
For the final code, the left alert(Anomalies) should be sent to the kafka sink. | |
*/ | |
if (alert.isRight()) { | |
return alert.right(); | |
} | |
return alert.left(); | |
}) | |
.filter(Objects::nonNull) | |
.addSink(kafkaSink); | |
env.execute("BAM Processing"); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment