This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class TestCheckpointJob { | |
public static void main(String[] args) throws Exception { | |
bootstrapConfig(); | |
//Rest same as previous code | |
} | |
} | |
static void bootstrapConfig() throws IOException { | |
ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class ConfigBootstrapper extends KeyedStateBootstrapFunction<String, TestConfig> { | |
ValueState<Integer> threshold; | |
@Override | |
public void open(Configuration parameters) throws Exception { | |
threshold = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("thresholdState", Integer.class)); | |
} | |
@Override | |
public void processElement(TestConfig testConfig, Context context) throws Exception { |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
BootstrapTransformation<TestConfig> getConfigTransformation(ExecutionEnvironment executionEnvironment){ | |
TestConfig testConfig = new TestConfig(); | |
testConfig.setKey("global"); | |
testConfig.setThresholdValue(10); | |
DataSet<TestConfig> configDataSet = executionEnvironment.fromElements(testConfig); | |
BootstrapTransformation<TestConfig> transformation = OperatorTransformation | |
.bootstrapWith(configDataSet) | |
.keyBy(TestConfig::getKey) | |
.transform(new ConfigBootstrapper()); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
BootstrapTransformation<TestConfig> getConfigTransformation(ExecutionEnvironment executionEnvironment){ | |
TestConfig testConfig = new TestConfig(); | |
testConfig.setKey("global"); | |
testConfig.setThresholdValue(10); | |
DataSet<TestConfig> configDataSet = executionEnvironment.fromElements(testConfig); | |
BootstrapTransformation<TestConfig> transformation = OperatorTransformation | |
.bootstrapWith(configDataSet) | |
.keyBy(TestConfig::getKey) | |
.transform(new ConfigBootstrapper()); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
BootstrapTransformation<TestConfig> getConfigTransformation(ExecutionEnvironment executionEnvironment) { | |
TestConfig testConfig = new TestConfig(); | |
testConfig.setKey("global"); | |
testConfig.setThresholdValue(10); | |
DataSet<TestConfig> configDataSet = executionEnvironment.fromElements(testConfig); | |
return OperatorTransformation | |
.bootstrapWith(configDataSet) | |
.keyBy(TestConfig::getKey) | |
.transform(new ConfigBootstrapper()); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public static void main(String[] args) throws Exception { | |
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); | |
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); | |
ParameterTool configuration = ParameterTool.fromArgs(args); | |
FlinkKafkaConsumer010<String> kafkaConsumer010 = new FlinkKafkaConsumer010<String>("test", new SimpleStringSchema(), getKafkaConsumerProperties("testing123")); | |
DataStream<String> srcStream = env.addSource(kafkaConsumer010); | |
Random random = new Random(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class StatefulMapTest extends RichFlatMapFunction<KeyValue, String> { | |
ValueState<Integer> previousInt; | |
ValueState<Integer> nextInt; | |
@Override | |
public void open(Configuration parameters) throws Exception { | |
super.open(parameters); | |
previousInt = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("previousInt", Integer.class)); | |
nextInt = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("nextInt", Integer.class)); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class StatefulProcess extends KeyedProcessFunction<String, KeyValue, KeyValue> { | |
ValueState<Integer> processedInt; | |
@Override | |
public void open(Configuration parameters) throws Exception { | |
super.open(parameters); | |
processedInt = getRuntimeContext().getState(new ValueStateDescriptor<>("processedInt", Integer.class)); | |
} | |
@Override |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class TestRocksDB { | |
public static void main(String[] args) throws Exception { | |
RocksDB.loadLibrary(); | |
String previousIntColumnFamily = "previousInt"; | |
byte[] previousIntColumnFamilyBA = previousIntColumnFamily.getBytes(StandardCharsets.UTF_8); | |
String nextIntcolumnFamily = "nextInt"; | |
byte[] nextIntcolumnFamilyBA = nextIntcolumnFamily.getBytes(StandardCharsets.UTF_8); | |
try (final ColumnFamilyOptions cfOpts = new ColumnFamilyOptions().optimizeUniversalStyleCompaction()) { |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
TypeInformation<Integer> resultType = TypeExtractor.createTypeInfo(Integer.class); | |
TypeSerializer<Integer> serializer = resultType.createSerializer(new ExecutionConfig()); | |
RocksIterator iterator = db.newIterator(columnFamilyHandle); | |
iterator.seekToFirst(); | |
iterator.status(); | |
while (iterator.isValid()) { | |
byte[] key = iterator.key(); | |
System.out.write(key); |