Skip to content

Instantly share code, notes, and snippets.

@alpinegizmo
Last active November 23, 2020 10:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save alpinegizmo/ff3d2e748287853c88f21259830b29cf to your computer and use it in GitHub Desktop.
Save alpinegizmo/ff3d2e748287853c88f21259830b29cf to your computer and use it in GitHub Desktop.
Create a Flink savepoint by bootstrapping its state
package com.ververica.flinktraining.examples.datastream_java.state;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.state.api.BootstrapTransformation;
import org.apache.flink.state.api.OperatorTransformation;
import org.apache.flink.state.api.Savepoint;
import org.apache.flink.state.api.functions.KeyedStateBootstrapFunction;
public class Bootstrap {
public static void main (String[] args) throws Exception {
ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
BootstrapTransformation<Integer> transform =
OperatorTransformation.bootstrapWith(bEnv.fromElements(1, 2, 3))
.keyBy(String::valueOf)
.transform(new SimplestTransform());
Savepoint
.create(new FsStateBackend("file:///tmp/checkpoints"), 256)
.withOperator("my-operator-uid", transform)
.write("file:///tmp/savepoints/");
bEnv.execute();
}
static public class SimplestTransform extends KeyedStateBootstrapFunction<String, Integer> {
ValueState<Integer> state;
@Override
public void open(Configuration parameters) {
ValueStateDescriptor<Integer> descriptor = new
ValueStateDescriptor<>("total", Types.INT);
state = getRuntimeContext().getState(descriptor);
}
@Override
public void processElement(Integer value, Context ctx) throws Exception {
state.update(value);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment