Skip to content

Instantly share code, notes, and snippets.

@asardaes
Created October 14, 2022 10:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save asardaes/b804b7ed04ace176881189c3d1cf842a to your computer and use it in GitHub Desktop.
Save asardaes/b804b7ed04ace176881189c3d1cf842a to your computer and use it in GitHub Desktop.
Flink test for broadcast state
@Test
fun `snapshot and subsequent restore`() {
// non-empty function (knows about active customers)
val function = getFunctionForTesting()
val (operatorId, stateFinalizer) = getHarness(function).use { harness ->
harness.processBroadcastElement(GenericChangeMessage(), 1L)
val stateFinalizer = harness.snapshotWithLocalState(0L, 1L)
assertEquals(1, stateFinalizer.jobManagerOwnedState.managedOperatorState.size)
Pair(harness.streamConfig.operatorID, stateFinalizer)
}
// call constructor explicitly because ProcessFunctionTestHarnesses.forBroadcastProcessFunction calls
// open with empty state, and we don't want that
BroadcastOperatorTestHarness(
CoBroadcastWithNonKeyedOperator(
// empty function to make sure it reads from state
MyFunction(emptyList()),
listOf(ServicesCacheFlinkBridge.BROADCAST_STATE_DESCRIPTOR)
),
1,
1,
0
).use { harness ->
harness.streamConfig.operatorID = operatorId
// unfortunately the API doesn't offer a way to set attempt number, so for now we use reflection
harness.environment.javaClass.getDeclaredField("taskInfo").apply {
isAccessible = true
val taskInfo = with(harness.environment.taskInfo) {
TaskInfo(
taskName,
maxNumberOfParallelSubtasks,
indexOfThisSubtask,
numberOfParallelSubtasks,
1,
allocationIDAsString
)
}
set(harness.environment, taskInfo)
}
harness.initializeState(stateFinalizer.jobManagerOwnedState)
harness.open()
val event = getEvent()
harness.processElement(event, 1L)
assertEquals(4, harness.output.size)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment