Skip to content

Instantly share code, notes, and snippets.

@cy6erGn0m
Created November 21, 2018 14:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cy6erGn0m/8213cf6a213aaa5a6194bc4203738eec to your computer and use it in GitHub Desktop.
Save cy6erGn0m/8213cf6a213aaa5a6194bc4203738eec to your computer and use it in GitHub Desktop.
Simplified async to blocking adapter
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import java.util.concurrent.locks.*
import kotlin.coroutines.*
private val MyEventLoop: CoroutineDispatcher = TODO()
/**
* This is simplified blocking adapter example.
* NEVER use in production as it is just clarification, not actual implementation
*/
class JavaBlockingQueue<E : Any>(private val coroutinesChannel: SendChannel<E>) {
@Volatile
private var parkedThread: Thread? = null
@UseExperimental(ExperimentalCoroutinesApi::class)
private var continuation: Continuation<Unit> = suspend { loop() }.createCoroutine(Continuation(MyEventLoop) {
})
private var element: E? = null
// should be only called from MyEventLoop
// should be never called concurrently
fun send(element: E) {
if (coroutinesChannel.offer(element)) return
sendBlocking(element)
}
private suspend fun loop() {
while (true) { // we are here only after sendBlocking invocation
val element = this.element!!
coroutinesChannel.send(element) // if we suspend here then sendBlocking execution will continue
this.element = null // if not suspended or resumed after send then set condition
signal() // unpark thread
rendezvous() // wait for the next sendBlocking
}
}
private suspend fun rendezvous(): Unit = suspendCancellableCoroutine {
continuation = it
}
private fun signal() {
parkedThread?.let { LockSupport.unpark(it) }
}
private fun sendBlocking(element: E) {
this.element = element
continuation.resume(Unit) // execute loop function
rendezvousBlocking(element)
}
private fun rendezvousBlocking(element: E) {
val thread = Thread.currentThread()
parkedThread = thread
while (this.element === element) {
LockSupport.park()
}
parkedThread = null
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment