Skip to content

Instantly share code, notes, and snippets.

@charles-tan
Created March 4, 2024 20:46
Show Gist options
  • Save charles-tan/50aab36b8555ed1555d32a5998f6324a to your computer and use it in GitHub Desktop.
Save charles-tan/50aab36b8555ed1555d32a5998f6324a to your computer and use it in GitHub Desktop.
public class StateProcessorTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
String savepointPath = Path.of("/tmp/checkpoints/609bc335486ca6cfcc8692e4c1ff8782/chk-8").toString();
SavepointReader savepoint = SavepointReader.read(env, savepointPath, new HashMapStateBackend());
DataStream<byte[]> listState = savepoint.readListState(
OperatorIdentifier.forUid("kafkasourceuid"),
"SourceReaderState",
PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO);
CloseableIterator<byte[]> states = listState.executeAndCollect();
while (states.hasNext()) {
byte[] s = states.next();
KafkaPartitionSplitSerializer serializer = new KafkaPartitionSplitSerializer();
KafkaPartitionSplit split = serializer.deserialize(serializer.getVersion(), Arrays.copyOfRange(s, 8, s.length));
System.out.println(
String.format("topic=%s, partition=%s, startingOffset=%s, stoppingOffset=%s, topicPartition=%s",
split.getTopic(), split.getPartition(),
split.getStartingOffset(), split.getStoppingOffset(), split.getTopicPartition()));
}
System.out.println("DONE");
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment