Skip to content

Instantly share code, notes, and snippets.

@KKcorps
Created September 18, 2019 14:07
Show Gist options
  • Save KKcorps/b29d980d2ebf791fb06463311753e649 to your computer and use it in GitHub Desktop.
Save KKcorps/b29d980d2ebf791fb06463311753e649 to your computer and use it in GitHub Desktop.
public class TestCheckpointJobWithBootstrap {
public static void main(String[] args) throws Exception {
bootstrapConfig();
... rest same as previous snippet ...
}
static void bootstrapConfig() throws IOException {
ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
ExistingSavepoint existingSavepoint = Savepoint.load(executionEnvironment, "oldSavepointPath", new MemoryStateBackend());
TestConfig testConfig = new TestConfig();
testConfig.setKey("global");
testConfig.setThresholdValue(10);
DataSet<TestConfig> configDataSet = executionEnvironment.fromElements(testConfig);
BootstrapTransformation<TestConfig> transformation = OperatorTransformation
.bootstrapWith(configDataSet)
.keyBy(TestConfig::getKey)
.transform(new ConfigBootstrapper());
String newSavepointPath = "newSavepointPath";
existingSavepoint.withOperator("classify_data", transformation).write(newSavepointPath);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment