Skip to content

Instantly share code, notes, and snippets.

@fvigotti
Created July 2, 2019 07:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fvigotti/9edaa94546f5ecb3e354bfc180b91b91 to your computer and use it in GitHub Desktop.
Save fvigotti/9edaa94546f5ecb3e354bfc180b91b91 to your computer and use it in GitHub Desktop.
investigating deadlock in kotlin
package net.FluxDeadLockInvestigation
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.reactor.asFlux
import org.junit.jupiter.api.Test
import reactor.core.scheduler.Schedulers
class FluxDeadLockInvestigation(){
@Test
fun deadlockInvstigation() = runBlocking<Unit>{
(0..3000).map { async {
//noDeadLock()
yesDeadLock()
}}.forEach { it.await()}
}
val thPool = newSingleThreadContext("single")
suspend fun yesDeadLock(){
withContext(Dispatchers.IO){
var channel = Channel<Int>(Channel.RENDEZVOUS)
val producer =
launch(newSingleThreadContext("nconte")){
while (isActive && !channel.isClosedForSend && !channel.isClosedForReceive ){
try{
channel.send(1)
}catch (t:Throwable){
}
}
}
withContext(Dispatchers.Default){
// withContext(thPool){ // <<-- with any other context, even a single thread, this long cause deadlocks
channel.asFlux()
.publishOn(Schedulers.elastic(),1)
.doFinally { producer.cancel() }
.limitRate(1)
.take(30)
.blockLast()
}
1
}
}
}
@fvigotti
Copy link
Author

fvigotti commented Jul 2, 2019

even a more minimal demonstration of the issue

class FluxDeadLockInvestigation(){

    @Test
    fun deadlockInvstigation() = runBlocking<Unit>(Dispatchers.IO){
        (0..3000).map { async {
            yesDeadLock()
        }}.forEach { it.await()}
    }
    val thPool = newSingleThreadContext("single")
    val thPool2 = newFixedThreadPoolContext(8,"single")
    suspend fun yesDeadLock(){
        withContext(Dispatchers.IO){
            var channel = Channel<Int>(Channel.RENDEZVOUS)
            val producer =
                launch(newSingleThreadContext("nconte")){
                    while (isActive  && !channel.isClosedForSend && !channel.isClosedForReceive ){
                        try{
                            channel.send(1)
                            cancel()
                        }catch (t:Throwable){
                        }
                    }
                }

            withContext(Dispatchers.Default){ // <<--- DEADLOCK
                //withContext(Dispatchers.IO){  // <<-- NO DEADLOCK
                //withContext(thPool){  // <<-- NO DEADLOCK
                //withContext(thPool2){  // <<-- NO DEADLOCK
                channel.asFlux()
                    //.subscribeOn(Schedulers.elastic())
                    .publishOn(Schedulers.elastic(),1)
                    //.doFinally {  producer.cancel() }
                    .limitRate(1)
                    //.take(30)
                    .take(1)
                    .blockLast()
            }
            1
        }
    }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment