Skip to content

Instantly share code, notes, and snippets.

public class StateProcessorTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
String savepointPath = Path.of("/tmp/checkpoints/609bc335486ca6cfcc8692e4c1ff8782/chk-8").toString();
SavepointReader savepoint = SavepointReader.read(env, savepointPath, new HashMapStateBackend());
DataStream<byte[]> listState = savepoint.readListState(
OperatorIdentifier.forUid("kafkasourceuid"),
"SourceReaderState",
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-state-processor-api</artifactId>
<version>1.18.0</version>
</dependency>
public static <T> void writeVersionAndSerialize(
SimpleVersionedSerializer<T> serializer, T datum, DataOutputView out)
throws IOException {
checkNotNull(serializer, "serializer");
checkNotNull(datum, "datum");
checkNotNull(out, "out");
final byte[] data = serializer.serialize(datum);
out.writeInt(serializer.getVersion());
@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
final ListState<byte[]> rawState =
context.getOperatorStateStore().getListState(SPLITS_STATE_DESC);
readerState = new SimpleVersionedListState<>(rawState, splitSerializer);
}
static final ListStateDescriptor<byte[]> SPLITS_STATE_DESC =
new ListStateDescriptor<>("SourceReaderState", BytePrimitiveArraySerializer.INSTANCE);
@Override
public byte[] serialize(KafkaPartitionSplit split) throws IOException {
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(baos)) {
out.writeUTF(split.getTopic());
out.writeInt(split.getPartition());
out.writeLong(split.getStartingOffset());
out.writeLong(split.getStoppingOffset().orElse(KafkaPartitionSplit.NO_STOPPING_OFFSET));
out.flush();
return baos.toByteArray();
public class FlinkTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("localhost:9092")
.setTopics("source")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.latest())
{
"window_start": "2023-12-18 21:14:45",
"window_end": "2023-12-18 21:16:45",
"txns_cnt": 368,
"min_priority_fee": 50000000,
"max_priority_fee": 32250000000,
"avg_priority_fee": 859790456,
"priority_fee_stddev": 97003259
}
{
CREATE CHANGELOG eth_txns_priority_fee_analysis AS
SELECT
window_start,
window_end,
COUNT(*) AS txns_cnt,
MIN("maxPriorityFeePerGas") AS min_priority_fee,
MAX("maxPriorityFeePerGas") AS max_priority_fee,
AVG("maxPriorityFeePerGas") AS avg_priority_fee,
STDDEV_SAMP("maxPriorityFeePerGas") AS priority_fee_stddev
FROM HOP(eth_txns_filtered, SIZE 2 MINUTES, ADVANCE BY 15 SECONDS)
CREATE STREAM eth_txns_filtered AS
SELECT
"txn_hash",
"block_ts",
"blockNumber",
"maxPriorityFeePerGas",
"gas"
FROM eth_txns WITH ('source.deserialization.error.handling'='IGNORE')
WHERE "maxPriorityFeePerGas" > 100 AND "maxPriorityFeePerGas" < "maxFeePerGas";