Last active
April 11, 2022 15:10
-
-
Save asardaes/eaf21f18860ec39b325a40acef2db678 to your computer and use it in GitHub Desktop.
Flink State Processor API Sample
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-10946 | |
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-11161 | |
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-11769 | |
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-11943 | |
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-13718 | |
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-13913 | |
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-13999 | |
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-14096 | |
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-14455 | |
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-14461 | |
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-14477 | |
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-14857 | |
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-20855 | |
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-20912 | |
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-2514 | |
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-2530 | |
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-25909 | |
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-2701 | |
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-2704 | |
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-2705 | |
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-276 | |
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-2792 | |
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-3085 | |
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-3213 | |
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-378 | |
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-43457 | |
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-43475 | |
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-43483 | |
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-43585 | |
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-445 | |
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-448 | |
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-454 | |
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-522 | |
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-614 | |
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-62338 | |
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-672 | |
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-6853 | |
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-8869 | |
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-8878 | |
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-8888 | |
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-9076 | |
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-9105 | |
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-9107 | |
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-9169 | |
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-9176 | |
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-9252 | |
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-9256 | |
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-9258 | |
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-9260 | |
drwxrwxrwx 2 vm vm 4096 Apr 7 14:14 chk-9307 | |
drwxrwxrwx 2 vm vm 69632 Apr 7 14:15 shared | |
drwxrwxrwx 2 vm vm 4096 Apr 7 11:40 taskowned |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
$ du -Lhd 1 /opt/flink/state/checkpoints/00000000000000000000000000000000/ | |
600K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-614 | |
460K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-2704 | |
424K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-9169 | |
660K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-43457 | |
855M /opt/flink/state/checkpoints/00000000000000000000000000000000/shared | |
468K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-6853 | |
856K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-14461 | |
492K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-378 | |
428K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-9256 | |
428K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-10946 | |
592K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-43585 | |
4.3M /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-62338 | |
460K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-11769 | |
456K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-11943 | |
428K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-9258 | |
620K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-2530 | |
660K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-43483 | |
420K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-2701 | |
476K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-8869 | |
488K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-9076 | |
660K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-14857 | |
592K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-25909 | |
428K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-9260 | |
428K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-3213 | |
468K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-8888 | |
516K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-13999 | |
604K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-2514 | |
456K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-2705 | |
764K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-14477 | |
660K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-43475 | |
428K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-9307 | |
468K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-8878 | |
752K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-14455 | |
520K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-448 | |
476K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-2792 | |
424K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-9176 | |
520K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-276 | |
492K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-13913 | |
420K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-3085 | |
476K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-9105 | |
712K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-522 | |
476K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-9107 | |
552K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-445 | |
428K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-9252 | |
756K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-20912 | |
560K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-454 | |
732K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-20855 | |
596K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-672 | |
4.0K /opt/flink/state/checkpoints/00000000000000000000000000000000/taskowned | |
456K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-13718 | |
468K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-11161 | |
504K /opt/flink/state/checkpoints/00000000000000000000000000000000/chk-14096 | |
885M /opt/flink/state/checkpoints/00000000000000000000000000000000/ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// kotlin POJO | |
import java.io.Serializable | |
data class StateCounts( | |
var windowState: Int = 0, | |
var globalState: Int = 0, | |
var windows: Int = 0, | |
var dtos: Int = 0, | |
val dtoBytes: Long = 0L, | |
) : Serializable { | |
companion object { | |
private const val serialVersionUID = 1L | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.flink.api.common.state.*; | |
import org.apache.flink.api.common.typeinfo.TypeHint; | |
import org.apache.flink.api.common.typeinfo.TypeInformation; | |
import org.apache.flink.api.common.typeinfo.Types; | |
import org.apache.flink.api.java.ExecutionEnvironment; | |
import org.apache.flink.configuration.Configuration; | |
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend; | |
import org.apache.flink.state.api.ExistingSavepoint; | |
import org.apache.flink.state.api.Savepoint; | |
import org.apache.flink.state.api.functions.WindowReaderFunction; | |
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; | |
import org.apache.flink.streaming.api.windowing.time.Time; | |
import org.apache.flink.streaming.api.windowing.windows.TimeWindow; | |
import org.apache.flink.util.Collector; | |
import org.openjdk.jol.info.ClassLayout; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.concurrent.ConcurrentHashMap; | |
import java.util.concurrent.ConcurrentMap; | |
import java.util.concurrent.atomic.AtomicInteger; | |
public class StateProcessing { | |
private static final Logger LOG = LoggerFactory.getLogger(StateProcessingTest.class); | |
void playground() throws Exception { | |
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); | |
env.setParallelism(1); | |
ExistingSavepoint savepoint = Savepoint.load( | |
env, | |
"file:/opt/flink/state/checkpoints/00000000000000000000000000000000/chk-62338", | |
new EmbeddedRocksDBStateBackend(true) | |
); | |
StateCounts counts = savepoint | |
.window(SlidingEventTimeWindows.of(Time.minutes(11L), Time.minutes(1L))) | |
.process( | |
"stream1", | |
new StateReaderFunction1(), | |
Types.STRING, | |
TypeInformation.of(Pojo.class), | |
TypeInformation.of(StateCounts.class) | |
) | |
.collect() | |
.parallelStream() | |
.reduce(new StateCounts(), (left, right) -> new StateCounts( | |
left.getWindowState() + right.getWindowState(), | |
left.getGlobalState() + right.getGlobalState(), | |
left.getWindows() + right.getWindows(), | |
left.getDtos() + right.getDtos(), | |
left.getDtoBytes() + right.getDtoBytes() | |
)); | |
// StateCounts(windowState=9, globalState=0, windows=114, dtos=1259, dtoBytes=60432) | |
LOG.info("State counts for stream1: {}", counts); | |
counts = savepoint | |
.window(SlidingEventTimeWindows.of(Time.minutes(11L), Time.minutes(1L))) | |
.process( | |
"stream2", | |
new StateReaderFunction2(), | |
Types.STRING, | |
TypeInformation.of(Pojo.class), | |
TypeInformation.of(StateCounts.class) | |
) | |
.collect() | |
.parallelStream() | |
.reduce(new StateCounts(), (left, right) -> new StateCounts( | |
left.getWindowState() + right.getWindowState(), | |
left.getGlobalState() + right.getGlobalState(), | |
left.getWindows() + right.getWindows(), | |
left.getDtos() + right.getDtos(), | |
left.getDtoBytes() + right.getDtoBytes() | |
)); | |
// StateCounts(windowState=0, globalState=277, windows=114, dtos=1259) | |
LOG.info("State counts for stream2: {}", counts); | |
counts = savepoint | |
.window(SlidingEventTimeWindows.of(Time.minutes(11L), Time.minutes(1L))) | |
.process( | |
"stream3", | |
new StateReaderFunction3(), | |
Types.STRING, | |
TypeInformation.of(Pojo.class), | |
TypeInformation.of(StateCounts.class) | |
) | |
.collect() | |
.parallelStream() | |
.reduce(new StateCounts(), (left, right) -> new StateCounts( | |
left.getWindowState() + right.getWindowState(), | |
left.getGlobalState() + right.getGlobalState(), | |
left.getWindows() + right.getWindows(), | |
left.getDtos() + right.getDtos(), | |
left.getDtoBytes() + right.getDtoBytes() | |
)); | |
// StateCounts(windowState=0, globalState=202, windows=114, dtos=1259) | |
LOG.info("State counts for stream3: {}", counts); | |
} | |
public static class StateReaderFunction1 extends WindowReaderFunction<Pojo, StateCounts, String, TimeWindow> { | |
private static final ListStateDescriptor<Integer> LSD = new ListStateDescriptor<>( | |
"LSD1", | |
Integer.class | |
); | |
@Override | |
public void readWindow(String s, Context<TimeWindow> context, Iterable<Pojo> elements, Collector<StateCounts> out) throws Exception { | |
int dtoCount = 0; | |
long dtoBytes = 0L; | |
for (Pojo element : elements) { | |
dtoCount++; | |
dtoBytes += ClassLayout.parseInstance(element).instanceSize(); | |
} | |
int stateCount = 0; | |
for (Integer i : context.windowState().getListState(LSD).get()) { | |
stateCount++; | |
} | |
StateCounts stateCounts = new StateCounts(stateCount, 0, 1, dtoCount, dtoBytes); | |
out.collect(stateCounts); | |
} | |
} | |
public static class StateReaderFunction2 extends WindowReaderFunction<Pojo, StateCounts, String, TimeWindow> { | |
private static final MapStateDescriptor<Long, List<String>> MSD = new MapStateDescriptor<>( | |
"MSD", | |
TypeInformation.of(Long.class), | |
TypeInformation.of(new TypeHint<List<String>>() {}) | |
); | |
private static final ConcurrentMap<String, Integer> GLOBAL_COUNTS = new ConcurrentHashMap<>(); | |
static { | |
// what do you know, TTL changes the serializer | |
MSD.enableTimeToLive(StateTtlConfig.newBuilder(org.apache.flink.api.common.time.Time.hours(1L)) | |
.cleanupInRocksdbCompactFilter(1000L) | |
.setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite) | |
.setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp) | |
.build()); | |
} | |
@Override | |
public void readWindow(String s, Context<TimeWindow> context, Iterable<Pojo> elements, Collector<StateCounts> out) { | |
int dtoCount = 0; | |
for (Pojo ignored : elements) { | |
dtoCount++; | |
} | |
final AtomicInteger stateCount = new AtomicInteger(); | |
GLOBAL_COUNTS.computeIfAbsent(s, (ignored) -> { | |
MapState<Long, List<String>> mapState = context.globalState().getMapState(MSD); | |
int count = 0; | |
try { | |
for (Map.Entry<Long, List<String>> entry : mapState.entries()) { | |
count += entry.getValue().size(); | |
} | |
} catch (Exception e) { | |
throw new RuntimeException(e); | |
} | |
stateCount.set(count); | |
return count; | |
}); | |
StateCounts stateCounts = new StateCounts(0, stateCount.get(), 1, dtoCount, 0L); | |
out.collect(stateCounts); | |
} | |
} | |
public static class StateReaderFunction3 extends WindowReaderFunction<Pojo, StateCounts, String, TimeWindow> { | |
private static final ListStateDescriptor<StateDTO> LSD = new ListStateDescriptor<>( | |
"LSD2", | |
StateDTO.class | |
); | |
private static final ConcurrentMap<String, Integer> GLOBAL_COUNTS = new ConcurrentHashMap<>(); | |
static { | |
LSD.enableTimeToLive(StateTtlConfig.newBuilder(org.apache.flink.api.common.time.Time.hours(1L)) | |
.cleanupInRocksdbCompactFilter(1000L) | |
.setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite) | |
.setStateVisibility(StateTtlConfig.StateVisibility.ReturnExpiredIfNotCleanedUp) | |
.build()); | |
} | |
private transient ListState<StateDTO> listState; | |
@Override | |
public void open(Configuration parameters) throws Exception { | |
super.open(parameters); | |
listState = getRuntimeContext().getListState(LSD); | |
} | |
@Override | |
public void readWindow(String s, Context<TimeWindow> context, Iterable<Pojo> elements, Collector<StateCounts> out) { | |
int dtoCount = 0; | |
for (Pojo ignored : elements) { | |
dtoCount++; | |
} | |
final AtomicInteger stateCount = new AtomicInteger(); | |
GLOBAL_COUNTS.computeIfAbsent(s, (ignored) -> { | |
int count = 0; | |
try { | |
for (StateDTO dto : listState.get()) { | |
count++; | |
} | |
} catch (Exception e) { | |
throw new RuntimeException(e); | |
} | |
stateCount.set(count); | |
return count; | |
}); | |
StateCounts stateCounts = new StateCounts(0, stateCount.get(), 1, dtoCount, 0L); | |
out.collect(stateCounts); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment