Skip to content

Instantly share code, notes, and snippets.

@revolutionisme
Created June 16, 2017 13:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save revolutionisme/38578e631f7a15f02cb2488f9fe56c76 to your computer and use it in GitHub Desktop.
Save revolutionisme/38578e631f7a15f02cb2488f9fe56c76 to your computer and use it in GitHub Desktop.
Rule pattern
package com.airplus.poc.helper;
import com.airplus.poc.model.BAMEvent;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* @author Biplob Biswas on 09.06.2017.
*/
public class CEPRule {
private static Logger log = LoggerFactory.getLogger(CEPRule.class);
public static Pattern<BAMEvent,?> getPatternForRule(String event1, String event2, String timeToWait){
Time withinTimeRule = getTimeToWait(timeToWait);
log.debug("Event 1 - {}", event1);
log.debug("Event 2 - {}", event2);
log.info("Time - {} {}",withinTimeRule.getSize(), withinTimeRule.getUnit());
return Pattern.<BAMEvent>begin("first")
.where(new SimpleCondition<BAMEvent>() {
private static final long serialVersionUID = 1390448281048961616L;
@Override
public boolean filter(BAMEvent event) throws Exception {
if(event.getEventName().equals(event1)){
//log.info("rreevent");
return true;
}
return false;
//return event.getEventName().equals(event1);
}
})
.followedBy("second")
.where(new IterativeCondition<BAMEvent>() {
private static final long serialVersionUID = -9216505110246259082L;
@Override
public boolean filter(BAMEvent secondEvent, Context<BAMEvent> ctx) throws Exception {
if (!secondEvent.getEventName().equals(event2)) {
return false;
}
for (BAMEvent firstEvent : ctx.getEventsForPattern("first")) {
if (secondEvent.getCorrelationID().contains(firstEvent.getEventId())) {
//log.info("Event1id: {}, Event2corrid: {}",firstEvent.getEventId(), secondEvent.getCorrelationID());
return true;
}
}
//log.info("Should never come here - sec Event - {}",secondEvent.getEventName());
return false;
}
})
.within(withinTimeRule);
}
private static Time getTimeToWait(String timeString){
String[] time = timeString.split(" ");
String timeUnit = time[1].toLowerCase();
long timelength = Long.parseLong(time[0]);
switch (timeUnit){
case "seconds" : return Time.seconds(timelength);
case "minutes" : return Time.minutes(timelength);
case "hours" : return Time.hours(timelength);
case "days" : return Time.days(timelength);
}
return null;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment