Created
June 6, 2017 13:19
-
-
Save revolutionisme/c3878a6420b322176ac686cbf1a8ac43 to your computer and use it in GitHub Desktop.
For reproducing no alert generation in FlinkCEP
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import ch.qos.logback.classic.Level; | |
import ch.qos.logback.classic.Logger; | |
import org.apache.flink.api.java.functions.KeySelector; | |
import org.apache.flink.cep.CEP; | |
import org.apache.flink.cep.PatternSelectFunction; | |
import org.apache.flink.cep.PatternStream; | |
import org.apache.flink.cep.PatternTimeoutFunction; | |
import org.apache.flink.cep.pattern.Pattern; | |
import org.apache.flink.cep.pattern.conditions.IterativeCondition; | |
import org.apache.flink.cep.pattern.conditions.SimpleCondition; | |
import org.apache.flink.streaming.api.TimeCharacteristic; | |
import org.apache.flink.streaming.api.datastream.DataStream; | |
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; | |
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor; | |
import org.apache.flink.streaming.api.windowing.time.Time; | |
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; | |
import org.apache.flink.streaming.util.serialization.SimpleStringSchema; | |
import org.slf4j.LoggerFactory; | |
import java.util.*; | |
/** | |
* @author Biplob Biswas on 06.06.2017. | |
*/ | |
public class CEPKafkaTest{ | |
private static final String ZKEEPER = "localhost:2181"; | |
private static final String BOOTSTRAP_SERVERS = "localhost:9092"; | |
public static void main(String[] args) throws Exception { | |
Logger LOG = (Logger) LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME); | |
LOG.setLevel(Level.INFO); | |
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); | |
System.out.println(env.getStreamTimeCharacteristic()); | |
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); | |
//env.getConfig().setAutoWatermarkInterval(10000); | |
// configure Kafka consumer | |
Properties props = new Properties(); | |
props = getDefaultProperties(props); | |
FlinkKafkaConsumer010<String> kafkaSource = new FlinkKafkaConsumer010<>( | |
Arrays.asList("test"), | |
new SimpleStringSchema(), | |
props); | |
DataStream<Event> input = env.addSource(kafkaSource) | |
.map(str -> { | |
String[] strArr = str.split(","); | |
return new Event(Integer.parseInt(strArr[0]),strArr[1],Integer.parseInt(strArr[2]),Integer.parseInt(strArr[3])); | |
}) | |
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Event>(Time.seconds(60)) { | |
private static final long serialVersionUID = -7228487240278428374L; | |
@Override | |
public long extractTimestamp(Event event) { | |
return event.getTimestamp(); | |
} | |
}); | |
input.print(); | |
Pattern<Event, ?> pattern = Pattern.<Event>begin("id").where(new SimpleCondition<Event>() { | |
private static final long serialVersionUID = 7219646616484327688L; | |
@Override | |
public boolean filter(Event myEvent) throws Exception { | |
return Objects.equals(myEvent.getPayload(), "a"); | |
} | |
}).followedBy("payload").where(new IterativeCondition<Event>() { | |
private static final long serialVersionUID = -9216505110246259082L; | |
@Override | |
public boolean filter(Event value, Context<Event> ctx) throws Exception { | |
for (Event event : ctx.getEventsForPattern("id")) { | |
if(event.getId() == value.getValue()) | |
return true; | |
} | |
return false; | |
} | |
}).within(Time.seconds(1)); | |
/* DataStream<Event> input = env.fromCollection(inputElements).assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Event>() { | |
private static final long serialVersionUID = -6619787346214245526L; | |
@Override | |
public long extractAscendingTimestamp(Event myEvent) { | |
return myEvent.getTimestamp(); | |
} | |
});*/ | |
PatternStream<Event> patternStream = CEP.pattern(input.keyBy(new KeySelector<Event, Long>() { | |
private static final long serialVersionUID = 6928745840509494198L; | |
@Override | |
public Long getKey(Event myEvent) throws Exception { | |
return 1L; | |
} | |
}), pattern); | |
patternStream.select(new PatternTimeoutFunction<Event, String>() { | |
@Override | |
public String timeout(Map<String, List<Event>> map, long l) throws Exception { | |
return map.toString() +" @ "+ l; | |
} | |
private static final long serialVersionUID = 300759199619789416L; | |
}, new PatternSelectFunction<Event, String>() { | |
@Override | |
public String select(Map<String, List<Event>> map) throws Exception { | |
return map.toString() + "_" + Event.class.getSimpleName().equals(map.get("id").get(0).getClass().getSimpleName()); | |
} | |
private static final long serialVersionUID = 732172159423132724L; | |
}).print(); | |
env.execute("Bug Reproducing Job"); | |
} | |
private static Properties getDefaultProperties(Properties prop) { | |
prop.put("group.id", "FlinkCEP"); | |
prop.put("bootstrap.servers", BOOTSTRAP_SERVERS); | |
prop.put("zookeeper.connect", ZKEEPER); | |
prop.put("auto.offset.reset", "latest"); | |
return prop; | |
} | |
} | |
class Event { | |
private int id; | |
private String payload; | |
private int timeStamp; | |
private int value; | |
Event(int id, String payload, int timeStamp, int value) { | |
this.id = id; | |
this.payload = payload; | |
this.timeStamp = timeStamp; | |
this.value = value; | |
} | |
String getPayload() { | |
return payload; | |
} | |
Long getId() { | |
return Long.parseLong(String.valueOf(id)); | |
} | |
long getTimestamp() { | |
return (long) timeStamp; | |
} | |
int getValue() { | |
return value; | |
} | |
@Override | |
public String toString() { | |
return id + "_" + payload + "_" + timeStamp + "_" + value; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment