Skip to content

Instantly share code, notes, and snippets.

@asardaes
Last active April 11, 2022 15:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save asardaes/eaf21f18860ec39b325a40acef2db678 to your computer and use it in GitHub Desktop.
Save asardaes/eaf21f18860ec39b325a40acef2db678 to your computer and use it in GitHub Desktop.
Flink State Processor API Sample
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-10946
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-11161
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-11769
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-11943
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-13718
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-13913
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-13999
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-14096
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-14455
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-14461
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-14477
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-14857
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-20855
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-20912
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-2514
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-2530
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-25909
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-2701
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-2704
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-2705
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-276
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-2792
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-3085
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-3213
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-378
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-43457
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-43475
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-43483
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-43585
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-445
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-448
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-454
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-522
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-614
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-62338
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-672
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-6853
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-8869
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-8878
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-8888
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-9076
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-9105
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-9107
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-9169
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-9176
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-9252
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-9256
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-9258
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-9260
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-9307
drwxrwxrwx 2 vm vm 69632 Apr 7 14:15 shared
drwxrwxrwx 2 vm vm 4096 Apr 7 11:40 taskowned
$ du -Lhd 1 /opt/flink/state/checkpoints/00000000000000000000000000000000/
600K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-614
460K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-2704
424K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-9169
660K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-43457
855M /opt/flink/state/checkpoints/00000000000000000000000000000000/shared
468K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-6853
856K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-14461
492K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-378
428K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-9256
428K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-10946
592K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-43585
4.3M /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-62338
460K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-11769
456K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-11943
428K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-9258
620K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-2530
660K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-43483
420K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-2701
476K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-8869
488K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-9076
660K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-14857
592K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-25909
428K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-9260
428K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-3213
468K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-8888
516K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-13999
604K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-2514
456K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-2705
764K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-14477
660K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-43475
428K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-9307
468K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-8878
752K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-14455
520K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-448
476K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-2792
424K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-9176
520K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-276
492K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-13913
420K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-3085
476K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-9105
712K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-522
476K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-9107
552K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-445
428K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-9252
756K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-20912
560K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-454
732K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-20855
596K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-672
4.0K /opt/flink/state/checkpoints/00000000000000000000000000000000/taskowned
456K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-13718
468K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-11161
504K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-14096
885M /opt/flink/state/checkpoints/00000000000000000000000000000000/
// kotlin POJO
import java.io.Serializable
data class StateCounts(
var windowState: Int = 0,
var globalState: Int = 0,
var windows: Int = 0,
var dtos: Int = 0,
val dtoBytes: Long = 0L,
) : Serializable {
companion object {
private const val serialVersionUID = 1L
}
}
import org.apache.flink.api.common.state.*;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.state.api.ExistingSavepoint;
import org.apache.flink.state.api.Savepoint;
import org.apache.flink.state.api.functions.WindowReaderFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.openjdk.jol.info.ClassLayout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
public class StateProcessing {
private static final Logger LOG = LoggerFactory.getLogger(StateProcessingTest.class);
void playground() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
ExistingSavepoint savepoint = Savepoint.load(
env,
"file:/opt/flink/state/checkpoints/00000000000000000000000000000000/chk-62338",
new EmbeddedRocksDBStateBackend(true)
);
StateCounts counts = savepoint
.window(SlidingEventTimeWindows.of(Time.minutes(11L), Time.minutes(1L)))
.process(
"stream1",
new StateReaderFunction1(),
Types.STRING,
TypeInformation.of(Pojo.class),
TypeInformation.of(StateCounts.class)
)
.collect()
.parallelStream()
.reduce(new StateCounts(), (left, right) -> new StateCounts(
left.getWindowState() + right.getWindowState(),
left.getGlobalState() + right.getGlobalState(),
left.getWindows() + right.getWindows(),
left.getDtos() + right.getDtos(),
left.getDtoBytes() + right.getDtoBytes()
));
// StateCounts(windowState=9, globalState=0, windows=114, dtos=1259, dtoBytes=60432)
LOG.info("State counts for stream1: {}", counts);
counts = savepoint
.window(SlidingEventTimeWindows.of(Time.minutes(11L), Time.minutes(1L)))
.process(
"stream2",
new StateReaderFunction2(),
Types.STRING,
TypeInformation.of(Pojo.class),
TypeInformation.of(StateCounts.class)
)
.collect()
.parallelStream()
.reduce(new StateCounts(), (left, right) -> new StateCounts(
left.getWindowState() + right.getWindowState(),
left.getGlobalState() + right.getGlobalState(),
left.getWindows() + right.getWindows(),
left.getDtos() + right.getDtos(),
left.getDtoBytes() + right.getDtoBytes()
));
// StateCounts(windowState=0, globalState=277, windows=114, dtos=1259)
LOG.info("State counts for stream2: {}", counts);
counts = savepoint
.window(SlidingEventTimeWindows.of(Time.minutes(11L), Time.minutes(1L)))
.process(
"stream3",
new StateReaderFunction3(),
Types.STRING,
TypeInformation.of(Pojo.class),
TypeInformation.of(StateCounts.class)
)
.collect()
.parallelStream()
.reduce(new StateCounts(), (left, right) -> new StateCounts(
left.getWindowState() + right.getWindowState(),
left.getGlobalState() + right.getGlobalState(),
left.getWindows() + right.getWindows(),
left.getDtos() + right.getDtos(),
left.getDtoBytes() + right.getDtoBytes()
));
// StateCounts(windowState=0, globalState=202, windows=114, dtos=1259)
LOG.info("State counts for stream3: {}", counts);
}
public static class StateReaderFunction1 extends WindowReaderFunction<Pojo, StateCounts, String, TimeWindow> {
private static final ListStateDescriptor<Integer> LSD = new ListStateDescriptor<>(
"LSD1",
Integer.class
);
@Override
public void readWindow(String s, Context<TimeWindow> context, Iterable<Pojo> elements, Collector<StateCounts> out) throws Exception {
int dtoCount = 0;
long dtoBytes = 0L;
for (Pojo element : elements) {
dtoCount++;
dtoBytes += ClassLayout.parseInstance(element).instanceSize();
}
int stateCount = 0;
for (Integer i : context.windowState().getListState(LSD).get()) {
stateCount++;
}
StateCounts stateCounts = new StateCounts(stateCount, 0, 1, dtoCount, dtoBytes);
out.collect(stateCounts);
}
}
public static class StateReaderFunction2 extends WindowReaderFunction<Pojo, StateCounts, String, TimeWindow> {
private static final MapStateDescriptor<Long, List<String>> MSD = new MapStateDescriptor<>(
"MSD",
TypeInformation.of(Long.class),
TypeInformation.of(new TypeHint<List<String>>() {})
);
private static final ConcurrentMap<String, Integer> GLOBAL_COUNTS = new ConcurrentHashMap<>();
static {
// what do you know, TTL changes the serializer
MSD.enableTimeToLive(StateTtlConfig.newBuilder(org.apache.flink.api.common.time.Time.hours(1L))
.cleanupInRocksdbCompactFilter(1000L)
.setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)
.build());
}
@Override
public void readWindow(String s, Context<TimeWindow> context, Iterable<Pojo> elements, Collector<StateCounts> out) {
int dtoCount = 0;
for (Pojo ignored : elements) {
dtoCount++;
}
final AtomicInteger stateCount = new AtomicInteger();
GLOBAL_COUNTS.computeIfAbsent(s, (ignored) -> {
MapState<Long, List<String>> mapState = context.globalState().getMapState(MSD);
int count = 0;
try {
for (Map.Entry<Long, List<String>> entry : mapState.entries()) {
count += entry.getValue().size();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
stateCount.set(count);
return count;
});
StateCounts stateCounts = new StateCounts(0, stateCount.get(), 1, dtoCount, 0L);
out.collect(stateCounts);
}
}
public static class StateReaderFunction3 extends WindowReaderFunction<Pojo, StateCounts, String, TimeWindow> {
private static final ListStateDescriptor<StateDTO> LSD = new ListStateDescriptor<>(
"LSD2",
StateDTO.class
);
private static final ConcurrentMap<String, Integer> GLOBAL_COUNTS = new ConcurrentHashMap<>();
static {
LSD.enableTimeToLive(StateTtlConfig.newBuilder(org.apache.flink.api.common.time.Time.hours(1L))
.cleanupInRocksdbCompactFilter(1000L)
.setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp)
.build());
}
private transient ListState<StateDTO> listState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
listState = getRuntimeContext().getListState(LSD);
}
@Override
public void readWindow(String s, Context<TimeWindow> context, Iterable<Pojo> elements, Collector<StateCounts> out) {
int dtoCount = 0;
for (Pojo ignored : elements) {
dtoCount++;
}
final AtomicInteger stateCount = new AtomicInteger();
GLOBAL_COUNTS.computeIfAbsent(s, (ignored) -> {
int count = 0;
try {
for (StateDTO dto : listState.get()) {
count++;
}
} catch (Exception e) {
throw new RuntimeException(e);
}
stateCount.set(count);
return count;
});
StateCounts stateCounts = new StateCounts(0, stateCount.get(), 1, dtoCount, 0L);
out.collect(stateCounts);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment