Skip to content

Instantly share code, notes, and snippets.

@hsyed
Created June 26, 2018 18:46
Show Gist options
  • Save hsyed/c27fc7089a1f84eed77c844100c26d8a to your computer and use it in GitHub Desktop.
Save hsyed/c27fc7089a1f84eed77c844100c26d8a to your computer and use it in GitHub Desktop.
coroutine binding glue.
package com.axsy.base.grpc.stub
import io.grpc.stub.StreamObserver
import kotlinx.coroutines.experimental.CompletableDeferred
import kotlinx.coroutines.experimental.Deferred
import kotlinx.coroutines.experimental.Job
import kotlinx.coroutines.experimental.channels.LinkedListChannel
import kotlinx.coroutines.experimental.channels.ReceiveChannel
import kotlinx.coroutines.experimental.channels.SendChannel
import kotlinx.coroutines.experimental.launch
import kotlin.coroutines.experimental.CoroutineContext
object KtStubs {
@JvmStatic
fun <E> connectChannelToObserver(
context: CoroutineContext,
channel: ReceiveChannel<E>,
observer: StreamObserver<E>
): Job = launch(context) {
try {
for (value in channel) {
observer.onNext(value)
}
observer.onCompleted()
} catch (t: Throwable) {
observer.onError(t)
}
}
@JvmStatic
fun <E> connectDeferredToObserver(
context: CoroutineContext,
deferred: Deferred<E>,
observer: StreamObserver<E>
): Job = launch(context) {
try {
observer.onNext(deferred.await())
observer.onCompleted()
} catch (t: Throwable) {
observer.onError(t)
}
}
}
data class ManyToOneCall<TRequest, TResponse>(
private val request: SendChannel<TRequest>,
private val response: Deferred<TResponse>
) : SendChannel<TRequest> by request, Deferred<TResponse> by response
data class ManyToManyCall<TRequest, TResponse>(
private val request: SendChannel<TRequest>,
private val response: ReceiveChannel<TResponse>
) : SendChannel<TRequest> by request, ReceiveChannel<TResponse> by response
class StreamObserverDeferred<E> : StreamObserver<E>, CompletableDeferred<E> by CompletableDeferred() {
override fun onNext(value: E) { complete(value) }
override fun onError(t: Throwable) { completeExceptionally(t) }
override fun onCompleted() { /* nothing */ }
}
// Should this be inheriting from a LinkedListChannel ? what are the backpressure consequences ?
class StreamObserverChannel<E> : LinkedListChannel<E>(), StreamObserver<E> {
override fun onNext(value: E) { offer(value) }
override fun onError(t: Throwable?) { close(cause = t) }
override fun onCompleted() { close(cause = null) }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment