Skip to content

Instantly share code, notes, and snippets.

@KKcorps
Last active September 18, 2019 14:24
Show Gist options
  • Save KKcorps/514806a3121847e8f8f3c44c711cd537 to your computer and use it in GitHub Desktop.
Save KKcorps/514806a3121847e8f8f3c44c711cd537 to your computer and use it in GitHub Desktop.
class ClassifyData extends RichFlatMapFunction<TestData, LabeledTestData>{
ValueState<Integer> threshold;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
threshold = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("thresholdState", Integer.class));
}
@Override
public void flatMap(TestData testData, Collector<LabeledTestData> collector) throws Exception {
LabeledTestData labeledTestData = new LabeledTestData();
labeledTestData.setKey(testData.getKey());
labeledTestData.setValue(testData.getValue());
labeledTestData.setCreatedAt(testData.getCreatedAt());
String label = "UNCLASSIFIED";
if(threshold.value() != null){
label = (testData.getValue() > threshold.value()) ? "L1" : "L2";
}
labeledTestData.setLabel(label);
collector.collect(labeledTestData);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment