Skip to content

Instantly share code, notes, and snippets.

@tillrohrmann
Created April 19, 2016 15:33
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save tillrohrmann/980bc997d9b8f1979f4a465b345d34dd to your computer and use it in GitHub Desktop.
Save tillrohrmann/980bc997d9b8f1979f4a465b345d34dd to your computer and use it in GitHub Desktop.
package stsffap;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import java.util.Map;
import java.util.UUID;
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
public class Job {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
//env.getConfig().disableSysoutLogging();
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4,
5000));
//env.enableCheckpointing(15000);
env.setParallelism(4);
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
DataStream<String> kafkaStream = env.fromElements(
"01:43:43.592\t1\t2121\t{\"Pressure target - Value\":\"6\"}",
"01:43:43.596\t1\t2121\t{\"Flow target - Value\":\"23\"}",
"01:43:44.263\t1\t2121\t{\"Pressure target - Value\":\"7\"}",
"01:43:44.972\t1\t2121\t{\"Flow target - Value\":\"24\"}",
"01:43:45.176\t1\t2121\t{\"Flow target - Value\":\"25\"}",
"01:43:45.279\t1\t2121\t{\"Flow target - Value\":\"26\"}",
"01:43:45.382\t1\t2121\t{\"Flow target - Value\":\"27\"}",
"01:43:45.586\t1\t2121\t{\"Flow target - Value\":\"28\"}",
"01:43:46.310\t1\t2121\t{\"Pressure target - Value\":\"7\"}",
"01:43:46.350\t1\t2121\t{\"Flow target - Value\":\"28\"}",
"01:43:48.563\t1\t2121\t{\"Pressure target - Value\":\"7\"}",
"01:43:48.567\t1\t2121\t{\"Flow target - Value\":\"28\"}",
"01:43:51.850\t1\t2121\t{\"Pressure target - Value\":\"7\"}",
"01:43:51.890\t1\t2121\t{\"Flow target - Value\":\"28\"}",
"01:43:53.512\t1\t2121\t{\"Pressure target - Value\":\"7\"}",
"01:43:53.516\t1\t2121\t{\"Flow target - Value\":\"28\"}",
"01:43:56.108\t1\t2121\t{\"Pressure target - Value\":\"7\"}",
"01:43:56.112\t1\t2121\t{\"Flow target - Value\":\"28\"}",
"01:43:58.533\t1\t2121\t{\"Pressure target - Value\":\"7\"}",
"01:43:58.538\t1\t2121\t{\"Flow target - Value\":\"28\"}",
"01:44:01.600\t1\t2121\t{\"Pressure target - Value\":\"7\"}",
"01:44:01.630\t1\t2121\t{\"Flow target - Value\":\"28\"}",
"01:44:03.587\t1\t2121\t{\"Pressure target - Value\":\"7\"}",
"01:44:03.591\t1\t2121\t{\"Flow target - Value\":\"28\"}",
"01:44:06.180\t1\t2121\t{\"Pressure target - Value\":\"7\"}",
"01:44:06.230\t1\t2121\t{\"Flow target - Value\":\"28\"}"
);
DataStream<Tuple5<String, String, String, String, Double>> data =
kafkaStream.flatMap(new SplitMapper());
SingleOutputStreamOperator<Tuple6<String, String, String, Double, Double, Double>> windowedData =
data.filter(new FilterFunction<Tuple5<String, String, String, String, Double>>() {
private static final long serialVersionUID = -5952425756492833594L;
@Override
public boolean filter(Tuple5<String, String, String, String, Double>
val) throws Exception {
return val.f3.contains("target - Value");
}
})
.keyBy(3)
.timeWindow(Time.seconds(10), Time.seconds(1))
.fold(new Tuple6<>("", "", "", 0.0d, 0.0d, 0.0d), new PressureElementCount());
windowedData.print();
Pattern<Tuple6<String, String, String, Double, Double, Double>, ?>
FlowFirstPattern =
Pattern.<Tuple6<String, String, String, Double, Double, Double>>begin("FlowOver10")
.where(new FilterFunction<Tuple6<String, String, String, Double, Double, Double>>() {
private static final long serialVersionUID = 5861517245439863889L;
@Override
public boolean filter(Tuple6<String, String, String, Double, Double, Double> value) throws Exception {
double avgFlow = (value.f5 / value.f4);
return value.f2.contains("Flow target - Value") && avgFlow > 25.0 && (value.f3 > avgFlow * 1.0);
}
})
.followedBy("PressureOver10").where(new FilterFunction<Tuple6<String, String, String, Double, Double, Double>>() {
private static final long serialVersionUID = -4037517308930307522L;
@Override
public boolean filter(Tuple6<String, String, String, Double, Double,
Double> value) throws Exception {
double avgPressure = (value.f5 / value.f4);
//System.out.println("Pressure: " + avgPressure);
return value.f2.equals("Pressure target - Value") && (avgPressure >
5.0);// && (value.f2 > avgPressure*1.0);
}
})
.within(Time.seconds(10));
PatternStream<Tuple6<String, String, String, Double, Double, Double>>
FlowFirstPatternStream = CEP.pattern(windowedData, FlowFirstPattern);
DataStream<String> warning = FlowFirstPatternStream.select(new PlacingWorkingTrocarWarning());
warning.print();
env.execute();
}
private static class PlacingWorkingTrocarWarning implements
PatternSelectFunction<Tuple6<String, String, String, Double, Double, Double>, String> {
private static final long serialVersionUID = 2576609635170800026L;
@Override
public String select(Map<String, Tuple6<String, String, String, Double, Double, Double>> pat) throws Exception {
//Tuple5<String, String, Double, Double, Double> pressure =
pat.get("PressureOver10");
//Tuple5<String, String, Double, Double, Double> flow =
pat.get("FlowOver10");
return " ####### Warning! FlowEvent ####### ";
}
}
private static class PressureElementCount implements
FoldFunction<Tuple5<String, String, String, String, Double>,
Tuple6<String, String, String, Double, Double, Double>> {
private static final long serialVersionUID = -1081752808506520154L;
@Override
public Tuple6<String, String, String, Double, Double, Double>
fold(Tuple6<String, String, String, Double, Double, Double> init,
Tuple5<String, String, String, String, Double> val) throws Exception {
double count = init.f4 + 1.0d;
double sum = init.f5 + val.f4; //!!!
return new Tuple6<>(val.f0, val.f1, val.f3, val.f4, count, sum);
}
}
private static class SplitMapper extends RichFlatMapFunction<String,
Tuple5<String, String, String, String, Double>> {
private static final long serialVersionUID = 7297664214330222193L;
@Override
public void flatMap(String msg, Collector<Tuple5<String, String, String, String, Double>> out) throws Exception {
getRuntimeContext().getLongCounter("eventCount").add(1L);
String[] split_msg = msg.split("\t");
String DeviceId = split_msg[1];
String[] array = split_msg[3].split(", \"");
for (String a : array) {
String[] split = a.split(":");
String val = split[1];
String testname = "test1";
String nom = val.replace("\"", "").replace("{", "").replace("}",
"").replace(",", ".");
String param = split[0].replace("\"", "").replace("{", "").replace("}",
"");
out.collect(new Tuple5<String, String, String, String, Double>(UUID.randomUUID().toString(), testname, DeviceId, param, Double.parseDouble(nom)));
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment