Skip to content

Instantly share code, notes, and snippets.

@ldenson11
Created January 29, 2020 22:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ldenson11/3151bbbbf860ea365291ce5d47c90fb9 to your computer and use it in GitHub Desktop.
Save ldenson11/3151bbbbf860ea365291ce5d47c90fb9 to your computer and use it in GitHub Desktop.
StreamExecutionEnvironment env = ...
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<Event> input = ...
DataStream<Event> partitionedInput = input.keyBy(new KeySelector<Event, Integer>() {
@Override
public Integer getKey(Event value) throws Exception {
return value.getId();
}
});
Pattern<MonitoringEvent, ?> warningPattern = Pattern.<MonitoringEvent>begin("First Event")
.subtype(TransactionEvent.class)
.where(evt -> evt.getTransaction() >= LARGE_TRANSACTION_THRESHOLD)
.next("Second Event")
.subtype(TransactionEvent.class)
.where(evt -> evt.getPIN() >= PIN_MISTYPE_THRESHOLD)
.within(Time.seconds(10));
PatternStream<Event> patternStream = CEP.pattern(partitionedInput, pattern);
DataStream<Alert> alerts = patternStream.select(new PatternSelectFunction<Event, Alert>() {
@Override
public Alert select(Map<String, List<Event>> pattern) throws Exception {
return createAlert(pattern);
}
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment