Created
June 16, 2017 13:43
-
-
Save revolutionisme/38578e631f7a15f02cb2488f9fe56c76 to your computer and use it in GitHub Desktop.
Rule pattern
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.airplus.poc.helper; | |
import com.airplus.poc.model.BAMEvent; | |
import org.apache.flink.cep.pattern.Pattern; | |
import org.apache.flink.cep.pattern.conditions.IterativeCondition; | |
import org.apache.flink.cep.pattern.conditions.SimpleCondition; | |
import org.apache.flink.streaming.api.windowing.time.Time; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
/** | |
* @author Biplob Biswas on 09.06.2017. | |
*/ | |
public class CEPRule { | |
private static Logger log = LoggerFactory.getLogger(CEPRule.class); | |
public static Pattern<BAMEvent,?> getPatternForRule(String event1, String event2, String timeToWait){ | |
Time withinTimeRule = getTimeToWait(timeToWait); | |
log.debug("Event 1 - {}", event1); | |
log.debug("Event 2 - {}", event2); | |
log.info("Time - {} {}",withinTimeRule.getSize(), withinTimeRule.getUnit()); | |
return Pattern.<BAMEvent>begin("first") | |
.where(new SimpleCondition<BAMEvent>() { | |
private static final long serialVersionUID = 1390448281048961616L; | |
@Override | |
public boolean filter(BAMEvent event) throws Exception { | |
if(event.getEventName().equals(event1)){ | |
//log.info("rreevent"); | |
return true; | |
} | |
return false; | |
//return event.getEventName().equals(event1); | |
} | |
}) | |
.followedBy("second") | |
.where(new IterativeCondition<BAMEvent>() { | |
private static final long serialVersionUID = -9216505110246259082L; | |
@Override | |
public boolean filter(BAMEvent secondEvent, Context<BAMEvent> ctx) throws Exception { | |
if (!secondEvent.getEventName().equals(event2)) { | |
return false; | |
} | |
for (BAMEvent firstEvent : ctx.getEventsForPattern("first")) { | |
if (secondEvent.getCorrelationID().contains(firstEvent.getEventId())) { | |
//log.info("Event1id: {}, Event2corrid: {}",firstEvent.getEventId(), secondEvent.getCorrelationID()); | |
return true; | |
} | |
} | |
//log.info("Should never come here - sec Event - {}",secondEvent.getEventName()); | |
return false; | |
} | |
}) | |
.within(withinTimeRule); | |
} | |
private static Time getTimeToWait(String timeString){ | |
String[] time = timeString.split(" "); | |
String timeUnit = time[1].toLowerCase(); | |
long timelength = Long.parseLong(time[0]); | |
switch (timeUnit){ | |
case "seconds" : return Time.seconds(timelength); | |
case "minutes" : return Time.minutes(timelength); | |
case "hours" : return Time.hours(timelength); | |
case "days" : return Time.days(timelength); | |
} | |
return null; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment