Skip to content

Instantly share code, notes, and snippets.

@KKcorps
Last active September 23, 2019 15:42
Show Gist options
  • Save KKcorps/c44458c01038a6166e7c6d71cb21adf5 to your computer and use it in GitHub Desktop.
Save KKcorps/c44458c01038a6166e7c6d71cb21adf5 to your computer and use it in GitHub Desktop.
public class StatefulProcess extends KeyedProcessFunction<String, KeyValue, KeyValue> {
ValueState<Integer> processedInt;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
processedInt = getRuntimeContext().getState(new ValueStateDescriptor<>("processedInt", Integer.class));
}
@Override
public void processElement(KeyValue keyValue, Context context, Collector<KeyValue> collector) throws Exception {
try{
Integer a = Integer.parseInt(keyValue.getValue());
processedInt.update(a);
collector.collect(keyValue);
}catch(Exception e){
e.printStackTrace();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment