Skip to content

Instantly share code, notes, and snippets.

@nipunarora
Last active August 29, 2015 14:23
Show Gist options
  • Save nipunarora/ed987e45028250248edc to your computer and use it in GitHub Desktop.
Save nipunarora/ed987e45028250248edc to your computer and use it in GitHub Desktop.
spark streaming broadcast variable
SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("Type2OnlineViolationChecker");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(10000));
br = ssc.sparkContext().broadcast(hm);
JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER);
JavaDStream<String> words = lines.flatMap(
new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String x) {
return Lists.newArrayList(NEWSPACE.split(x));
}
});
words.persist();
JavaDStream<String> matched = words.filter(
new Function<String, Boolean>() {
public Boolean call(String line) {
return line.contains("pattern");
}
});
JavaDStream<Tuple3<Long,Double,String>> split = matched.map(new GenerateType2Scores());
JavaDStream<Tuple3<Long,Double,String>> filtered = split.filter(
new Function<Tuple3<Long, Double, String>, Boolean>() {
@Override
public Boolean call(Tuple3<Long, Double, String> longDoubleStringTuple3) throws Exception {
if(longDoubleStringTuple3._2() > 0.0)
return true;
else
return false;
}
}
);
JavaDStream<String> anomalyOutput = filtered.map(
new Function<Tuple3<Long, Double, String>, String>() {
@Override
public String call(Tuple3<Long, Double, String> longDoubleStringTuple3) throws Exception {
String logMsg = longDoubleStringTuple3._3();
Double score = longDoubleStringTuple3._2();
String out = "\"anomaly_reason\"" + ":\"" + Type2ViolationChecker.getMostAnomalousField(br.getValue(),logMsg) +
"\"," + "\"anomaly_score\":" + score + "% ," + logMsg + "\n";
return out;
}
}
);
anomalyOutput.foreach(
new Function<JavaRDD<String>, Void>() {
@Override
public Void call(JavaRDD<String> stringJavaRDD) throws Exception {
List<String> list = stringJavaRDD.collect();
for (String str : list)
Files.append(str, type2_outputFile, Charset.defaultCharset());
return null;
}
}
);
ssc.start();
ssc.awaitTermination();
}
}
class GenerateType2Scores implements Function<String, Tuple3<Long, Double, String>> {
@Override
public Tuple3<Long, Double, String> call(String s) throws Exception{
Long time = Type2ViolationChecker.getMTS(s);
HashMap<String,FieldModel> temphm= Type2ViolationChecker.br.value();
Double score = Type2ViolationChecker.getAnomalyScore(temphm,s);
return new Tuple3<Long, Double, String>(time,score, s);}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment