Skip to content

Instantly share code, notes, and snippets.

@gyfora
Created March 9, 2017 14:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gyfora/b5ccf836dc9e95bab2b1f5f483cb8bf6 to your computer and use it in GitHub Desktop.
Save gyfora/b5ccf836dc9e95bab2b1f5f483cb8bf6 to your computer and use it in GitHub Desktop.
Reproduce
public class ExampleProgram {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.socketTextStream("localhost", 9999)
.shuffle()
.filter(new StatefulFilter()).uid("f")
.shuffle()
// .map(x -> x)
.print();
CheckpointConfig cfg = env.getCheckpointConfig();
cfg.setCheckpointInterval(1000);
env.setStateBackend(new MemoryStateBackend());
env.execute();
}
private static class StatefulFilter implements FilterFunction<String>, ListCheckpointed<String> {
@Override
public boolean filter(String arg0) throws Exception {
return true;
}
@Override
public void restoreState(List<String> state) throws Exception {}
@Override
public List<String> snapshotState(long checkpointId, long timestamp) throws Exception {
return Collections.singletonList("a");
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment