Skip to content

Instantly share code, notes, and snippets.

@KKcorps
Last active September 23, 2019 15:42
Show Gist options
  • Save KKcorps/16e43f6c8414c2225d5362d69ec5a660 to your computer and use it in GitHub Desktop.
Save KKcorps/16e43f6c8414c2225d5362d69ec5a660 to your computer and use it in GitHub Desktop.
public class StatefulMapTest extends RichFlatMapFunction<KeyValue, String> {
ValueState<Integer> previousInt;
ValueState<Integer> nextInt;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
previousInt = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("previousInt", Integer.class));
nextInt = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("nextInt", Integer.class));
}
@Override
public void flatMap(KeyValue s, Collector<String> collector) throws Exception {
try{
Integer oldInt = Integer.parseInt(s.getValue());
Integer newInt;
if(previousInt.value() == null){
newInt = oldInt;
collector.collect("OLD INT: " + oldInt.toString());
}else{
newInt = oldInt - previousInt.value();
collector.collect("NEW INT: " + newInt.toString());
}
nextInt.update(newInt);
previousInt.update(oldInt);
}catch(Exception e){
e.printStackTrace();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment