Skip to content

Instantly share code, notes, and snippets.

@ScottPierce
Last active April 1, 2020 03:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ScottPierce/0e687021f34d779708d1f32bfc81e8e2 to your computer and use it in GitHub Desktop.
Save ScottPierce/0e687021f34d779708d1f32bfc81e8e2 to your computer and use it in GitHub Desktop.
MutabilityException
package com.example.abtest
import co.touchlab.stately.freeze
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.emitAll
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.receiveAsFlow
import kotlinx.coroutines.launch
import kotlinx.coroutines.newSingleThreadContext
import kotlinx.coroutines.runBlocking
import kotlin.test.Test
class InitTest {
@Test
fun test1() {
runBlocking(Contexts.context2) {
flowCallbackWrapper {
println("Callback: $it")
}
}
runBlocking(Contexts.context1) {
delay(5000)
}
}
@Test
fun test2() {
runBlocking(Contexts.context1) {
flow1().collect {
println("Flow: $it")
}
}
}
fun flowCallbackWrapper(
callback: (value: String) -> Unit
) {
callback.freeze()
CoroutineScope(Contexts.context2).launch {
flow1().collect {
callback(it)
}
}
}
fun flow1(): Flow<String> = flow {
var isEmpty = true
emitAll(
AsyncExample().getData()
.onEach { isEmpty = false }
.onCompletion {
if (isEmpty) {
emit("Default")
}
}
)
}
}
object Contexts {
val context1 = newSingleThreadContext("1")
val context2 = newSingleThreadContext("2")
}
class AsyncExample {
private val channel = Channel<suspend (delegate: Example) -> Unit>()
init {
CoroutineScope(Contexts.context2).launch {
val delegate = Example()
channel.receiveAsFlow().collect {
it(delegate)
}
}
freeze()
}
suspend fun getData(): Flow<String> = flow {
channel.send {
emitAll(it.getData())
}
}.flowOn(Contexts.context2)
}
class Example {
fun getData(): Flow<String> = flow {
}
}
@ScottPierce
Copy link
Author

test1 and test2 are driven by the same code, except test1 wraps a suspend function with a callback. Why does test1 have this exception, but test2 doesn't?

If I comment out test2 and run `test1, I'll see the following exception in the standard output, but the test still passes:

Uncaught Kotlin exception: kotlinx.coroutines.CoroutinesInternalError: Fatal exception in coroutines machinery for CancellableContinuation(Shareable[used]){kotlin.Unit}@2929dcd8. Please read KDoc to 'handleFatalException' method and report this incident to maintainers
        at 0   test.kexe                           0x0000000106c19903 kfun:kotlin.Error.<init>(kotlin.String?;kotlin.Throwable?)kotlin.Error + 115 (/Users/teamcity1/teamcity_work/4d622a065c544371/runtime/src/main/kotlin/kotlin/Exceptions.kt:14:63)
        at 1   test.kexe                           0x0000000106dafd4c kfun:kotlinx.coroutines.CoroutinesInternalError.<init>(kotlin.String;kotlin.Throwable)kotlinx.coroutines.CoroutinesInternalError + 124 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/common/src/Exceptions.common.kt:28:77)
        at 2   test.kexe                           0x0000000106e1a27c kfun:kotlinx.coroutines.DispatchedTask.handleFatalException$kotlinx-coroutines-core(kotlin.Throwable?;kotlin.Throwable?) + 940 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/common/src/internal/DispatchedTask.kt:93:22)
        at 3   test.kexe                           0x0000000106e19e6a kfun:kotlinx.coroutines.DispatchedTask.run() + 3226 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/common/src/internal/DispatchedTask.kt:64:13)
        at 4   test.kexe                           0x0000000106dab0d6 kfun:kotlinx.coroutines.EventLoopImplBase.processNextEvent()kotlin.Long + 822 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/common/src/EventLoop.common.kt:272:20)
        at 5   test.kexe                           0x0000000106e3296d kfun:kotlinx.coroutines.runEventLoop$kotlinx-coroutines-core(kotlinx.coroutines.EventLoop?;kotlin.Function0<kotlin.Boolean>) + 829 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/native/src/Builders.kt:80:40)
        at 6   test.kexe                           0x0000000106e3b81a kfun:kotlinx.coroutines.WorkerCoroutineDispatcherImpl.start$lambda-0#internal + 410 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/native/src/Workers.kt:48:0)
        at 7   test.kexe                           0x0000000106e3b9fe kfun:kotlinx.coroutines.WorkerCoroutineDispatcherImpl.$start$lambda-0$FUNCTION_REFERENCE$33.invoke#internal + 62 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/native/src/Workers.kt:47:24)
        at 8   test.kexe                           0x0000000106e3ba5e kfun:kotlinx.coroutines.WorkerCoroutineDispatcherImpl.$start$lambda-0$FUNCTION_REFERENCE$33.$<bridge-UNN>invoke()#internal + 62 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/native/src/Workers.kt:47:24)
        at 9   test.kexe                           0x0000000106c4f121 WorkerLaunchpad + 177 (/Users/teamcity1/teamcity_work/4d622a065c544371/runtime/src/main/kotlin/kotlin/native/concurrent/Internal.kt:67:54)
        at 10  test.kexe                           0x0000000106d72c19 _ZN6Worker19processQueueElementEb + 2569
        at 11  test.kexe                           0x0000000106d731c6 _ZN12_GLOBAL__N_113workerRoutineEPv + 54
        at 12  libsystem_pthread.dylib             0x00007fff6af12109 _pthread_start + 148
        at 13  libsystem_pthread.dylib             0x00007fff6af0db8b thread_start + 15
Caused by: kotlin.native.concurrent.InvalidMutabilityException: mutation attempt of frozen kotlinx.coroutines.flow.internal.SafeCollector@d6d61628
        at 0   test.kexe                           0x0000000106c20787 kfun:kotlin.Throwable.<init>(kotlin.String?)kotlin.Throwable + 87 (/Users/teamcity1/teamcity_work/4d622a065c544371/runtime/src/main/kotlin/kotlin/Throwable.kt:22:37)
        at 1   test.kexe                           0x0000000106c19a05 kfun:kotlin.Exception.<init>(kotlin.String?)kotlin.Exception + 85 (/Users/teamcity1/teamcity_work/4d622a065c544371/runtime/src/main/kotlin/kotlin/Exceptions.kt:23:44)
        at 2   test.kexe                           0x0000000106c19ba5 kfun:kotlin.RuntimeException.<init>(kotlin.String?)kotlin.RuntimeException + 85 (/Users/teamcity1/teamcity_work/4d622a065c544371/runtime/src/main/kotlin/kotlin/Exceptions.kt:34:44)
        at 3   test.kexe                           0x0000000106c4dcd5 kfun:kotlin.native.concurrent.InvalidMutabilityException.<init>(kotlin.String)kotlin.native.concurrent.InvalidMutabilityException + 85 (/Users/teamcity1/teamcity_work/4d622a065c544371/runtime/src/main/kotlin/kotlin/native/concurrent/Freezing.kt:22:60)
        at 4   test.kexe                           0x0000000106c4f558 ThrowInvalidMutabilityException + 680 (/Users/teamcity1/teamcity_work/4d622a065c544371/runtime/src/main/kotlin/kotlin/native/concurrent/Internal.kt:90:11)
        at 5   test.kexe                           0x0000000106d5a098 MutationCheck + 104
        at 6   test.kexe                           0x0000000106e3e066 kfun:kotlinx.coroutines.flow.internal.SafeCollector.<set-lastEmissionContext>#internal + 102 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/native/src/flow/internal/SafeCollector.kt:17:13)
        at 7   test.kexe                           0x0000000106e3e302 kfun:kotlinx.coroutines.flow.internal.SafeCollector.emit(T) + 610 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/native/src/flow/internal/SafeCollector.kt:23:13)
        at 8   test.kexe                           0x0000000106e3e2c3 kfun:kotlinx.coroutines.flow.internal.SafeCollector.emit(T) + 547 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/native/src/flow/internal/SafeCollector.kt:25:19)
        at 9   test.kexe                           0x0000000106bc47a6 kfun:com.example.abtest.InitTest.$flow1$lambda-6$lambda-5COROUTINE$4.invokeSuspend#internal + 774 (/Users/scott.pierce/workspace/Rally/ab-testing-firebase/client/src/macosX64Test/kotlin/com/example/abtest/InitTest.kt:63:25)
        at 10  test.kexe                           0x0000000106bc4d07 kfun:com.example.abtest.InitTest.$flow1$lambda-6$lambda-5COROUTINE$4.invoke#internal + 327 (/Users/scott.pierce/workspace/Rally/ab-testing-firebase/client/src/macosX64Test/kotlin/com/example/abtest/InitTest.kt:61:31)
        at 11  test.kexe                           0x0000000106e0f209 kfun:kotlinx.coroutines.flow.$invokeSafelyCOROUTINE$171.invokeSuspend#internal + 745 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt:175:9)
        at 12  test.kexe                           0x0000000106e0f579 kfun:kotlinx.coroutines.flow.invokeSafely#internal + 393 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt:170:17)
        at 13  test.kexe                           0x0000000106e10466 kfun:kotlinx.coroutines.flow.<no name provided>_1.$collect_2COROUTINE$176.invokeSuspend#internal + 2614 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/common/src/flow/operators/Emitters.kt:121:0)
        at 14  test.kexe                           0x0000000106c406dc kfun:kotlin.coroutines.native.internal.BaseContinuationImpl.resumeWith(kotlin.Result<kotlin.Any?>) + 700 (/Users/teamcity1/teamcity_work/4d622a065c544371/runtime/src/main/kotlin/kotlin/coroutines/ContinuationImpl.kt:26:0)
        at 15  test.kexe                           0x0000000106e19c2d kfun:kotlinx.coroutines.DispatchedTask.run() + 2653 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/common/src/internal/DispatchedTask.kt:42:0)
        at 16  test.kexe                           0x0000000106dab0d6 kfun:kotlinx.coroutines.EventLoopImplBase.processNextEvent()kotlin.Long + 822 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/common/src/EventLoop.common.kt:272:20)
        at 17  test.kexe                           0x0000000106e3296d kfun:kotlinx.coroutines.runEventLoop$kotlinx-coroutines-core(kotlinx.coroutines.EventLoop?;kotlin.Function0<kotlin.Boolean>) + 829 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/native/src/Builders.kt:80:40)
        at 18  test.kexe                           0x0000000106e3b81a kfun:kotlinx.coroutines.WorkerCoroutineDispatcherImpl.start$lambda-0#internal + 410 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/native/src/Workers.kt:48:0)
        at 19  test.kexe                           0x0000000106e3b9fe kfun:kotlinx.coroutines.WorkerCoroutineDispatcherImpl.$start$lambda-0$FUNCTION_REFERENCE$33.invoke#internal + 62 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/native/src/Workers.kt:47:24)
        at 20  test.kexe                           0x0000000106e3ba5e kfun:kotlinx.coroutines.WorkerCoroutineDispatcherImpl.$start$lambda-0$FUNCTION_REFERENCE$33.$<bridge-UNN>invoke()#internal + 62 (/opt/buildAgent/work/44ec6e850d5c63f0/kotlinx-coroutines-core/native/src/Workers.kt:47:24)
        at 21  test.kexe                           0x0000000106c4f121 WorkerLaunchpad + 177 (/Users/teamcity1/teamcity_work/4d622a065c544371/runtime/src/main/kotlin/kotlin/native/concurrent/Internal.kt:67:54)
        at 22  test.kexe                           0x0000000106d72c19 _ZN6Worker19processQueueElementEb + 2569
        at 23  test.kexe                           0x0000000106d731c6 _ZN12_GLOBAL__N_113workerRoutineEPv + 54
        at 24  libsystem_pthread.dylib             0x00007fff6af12109 _pthread_start + 148
        at 25  libsystem_pthread.dylib             0x00007fff6af0db8b thread_start + 15

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment