Skip to content

Instantly share code, notes, and snippets.

View KKcorps's full-sized avatar
🏠
Working from home

Kartik Khare KKcorps

🏠
Working from home
View GitHub Profile
public class TestCheckpointJob {
public static void main(String[] args) throws Exception {
bootstrapConfig();
//Rest same as previous code
}
}
static void bootstrapConfig() throws IOException {
ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
class ConfigBootstrapper extends KeyedStateBootstrapFunction<String, TestConfig> {
ValueState<Integer> threshold;
@Override
public void open(Configuration parameters) throws Exception {
threshold = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("thresholdState", Integer.class));
}
@Override
public void processElement(TestConfig testConfig, Context context) throws Exception {
BootstrapTransformation<TestConfig> getConfigTransformation(ExecutionEnvironment executionEnvironment){
TestConfig testConfig = new TestConfig();
testConfig.setKey("global");
testConfig.setThresholdValue(10);
DataSet<TestConfig> configDataSet = executionEnvironment.fromElements(testConfig);
BootstrapTransformation<TestConfig> transformation = OperatorTransformation
.bootstrapWith(configDataSet)
.keyBy(TestConfig::getKey)
.transform(new ConfigBootstrapper());
BootstrapTransformation<TestConfig> getConfigTransformation(ExecutionEnvironment executionEnvironment){
TestConfig testConfig = new TestConfig();
testConfig.setKey("global");
testConfig.setThresholdValue(10);
DataSet<TestConfig> configDataSet = executionEnvironment.fromElements(testConfig);
BootstrapTransformation<TestConfig> transformation = OperatorTransformation
.bootstrapWith(configDataSet)
.keyBy(TestConfig::getKey)
.transform(new ConfigBootstrapper());
BootstrapTransformation<TestConfig> getConfigTransformation(ExecutionEnvironment executionEnvironment) {
TestConfig testConfig = new TestConfig();
testConfig.setKey("global");
testConfig.setThresholdValue(10);
DataSet<TestConfig> configDataSet = executionEnvironment.fromElements(testConfig);
return OperatorTransformation
.bootstrapWith(configDataSet)
.keyBy(TestConfig::getKey)
.transform(new ConfigBootstrapper());
public class StatefulMapTest extends RichFlatMapFunction<KeyValue, String> {
ValueState<Integer> previousInt;
ValueState<Integer> nextInt;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
previousInt = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("previousInt", Integer.class));
nextInt = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("nextInt", Integer.class));
}
public class StatefulProcess extends KeyedProcessFunction<String, KeyValue, KeyValue> {
ValueState<Integer> processedInt;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
processedInt = getRuntimeContext().getState(new ValueStateDescriptor<>("processedInt", Integer.class));
}
@Override
public class TestRocksDB {
public static void main(String[] args) throws Exception {
RocksDB.loadLibrary();
String previousIntColumnFamily = "previousInt";
byte[] previousIntColumnFamilyBA = previousIntColumnFamily.getBytes(StandardCharsets.UTF_8);
String nextIntcolumnFamily = "nextInt";
byte[] nextIntcolumnFamilyBA = nextIntcolumnFamily.getBytes(StandardCharsets.UTF_8);
try (final ColumnFamilyOptions cfOpts = new ColumnFamilyOptions().optimizeUniversalStyleCompaction()) {
TypeInformation<Integer> resultType = TypeExtractor.createTypeInfo(Integer.class);
TypeSerializer<Integer> serializer = resultType.createSerializer(new ExecutionConfig());
RocksIterator iterator = db.newIterator(columnFamilyHandle);
iterator.seekToFirst();
iterator.status();
while (iterator.isValid()) {
byte[] key = iterator.key();
System.out.write(key);