Last active
August 29, 2015 14:23
-
-
Save nipunarora/ed987e45028250248edc to your computer and use it in GitHub Desktop.
spark streaming broadcast variable
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
SparkConf sparkConf = new SparkConf(); | |
sparkConf.setAppName("Type2OnlineViolationChecker"); | |
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(10000)); | |
br = ssc.sparkContext().broadcast(hm); | |
JavaReceiverInputDStream<String> lines = ssc.socketTextStream( | |
args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER); | |
JavaDStream<String> words = lines.flatMap( | |
new FlatMapFunction<String, String>() { | |
@Override | |
public Iterable<String> call(String x) { | |
return Lists.newArrayList(NEWSPACE.split(x)); | |
} | |
}); | |
words.persist(); | |
JavaDStream<String> matched = words.filter( | |
new Function<String, Boolean>() { | |
public Boolean call(String line) { | |
return line.contains("pattern"); | |
} | |
}); | |
JavaDStream<Tuple3<Long,Double,String>> split = matched.map(new GenerateType2Scores()); | |
JavaDStream<Tuple3<Long,Double,String>> filtered = split.filter( | |
new Function<Tuple3<Long, Double, String>, Boolean>() { | |
@Override | |
public Boolean call(Tuple3<Long, Double, String> longDoubleStringTuple3) throws Exception { | |
if(longDoubleStringTuple3._2() > 0.0) | |
return true; | |
else | |
return false; | |
} | |
} | |
); | |
JavaDStream<String> anomalyOutput = filtered.map( | |
new Function<Tuple3<Long, Double, String>, String>() { | |
@Override | |
public String call(Tuple3<Long, Double, String> longDoubleStringTuple3) throws Exception { | |
String logMsg = longDoubleStringTuple3._3(); | |
Double score = longDoubleStringTuple3._2(); | |
String out = "\"anomaly_reason\"" + ":\"" + Type2ViolationChecker.getMostAnomalousField(br.getValue(),logMsg) + | |
"\"," + "\"anomaly_score\":" + score + "% ," + logMsg + "\n"; | |
return out; | |
} | |
} | |
); | |
anomalyOutput.foreach( | |
new Function<JavaRDD<String>, Void>() { | |
@Override | |
public Void call(JavaRDD<String> stringJavaRDD) throws Exception { | |
List<String> list = stringJavaRDD.collect(); | |
for (String str : list) | |
Files.append(str, type2_outputFile, Charset.defaultCharset()); | |
return null; | |
} | |
} | |
); | |
ssc.start(); | |
ssc.awaitTermination(); | |
} | |
} | |
class GenerateType2Scores implements Function<String, Tuple3<Long, Double, String>> { | |
@Override | |
public Tuple3<Long, Double, String> call(String s) throws Exception{ | |
Long time = Type2ViolationChecker.getMTS(s); | |
HashMap<String,FieldModel> temphm= Type2ViolationChecker.br.value(); | |
Double score = Type2ViolationChecker.getAnomalyScore(temphm,s); | |
return new Tuple3<Long, Double, String>(time,score, s);} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment