Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
resilience4j-grpc
import io.github.resilience4j.bulkhead.Bulkhead
import io.github.resilience4j.bulkhead.utils.BulkheadUtils
import io.grpc.CallOptions
import io.grpc.Channel
import io.grpc.ClientCall
import io.grpc.ClientInterceptor
import io.grpc.ClientInterceptors
import io.grpc.ForwardingClientCallListener
import io.grpc.Metadata
import io.grpc.MethodDescriptor
import io.grpc.Status
class BulkheadClientInterceptor(private val bulkhead: Bulkhead) : ClientInterceptor {
override fun <ReqT, RespT> interceptCall(
method: MethodDescriptor<ReqT, RespT>,
callOptions: CallOptions,
next: Channel
): ClientCall<ReqT, RespT> =
object : ClientInterceptors.CheckedForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
override fun checkedStart(responseListener: ClientCall.Listener<RespT>, headers: Metadata) {
BulkheadUtils.isCallPermitted(bulkhead)
delegate().start(Listener(responseListener), headers)
}
}
private inner class Listener<RespT>(delegate: ClientCall.Listener<RespT>) :
ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(delegate) {
override fun onClose(status: Status, trailers: Metadata) {
bulkhead.onComplete()
super.onClose(status, trailers)
}
}
}
import io.github.resilience4j.bulkhead.Bulkhead
import io.grpc.ForwardingServerCallListener
import io.grpc.Metadata
import io.grpc.ServerCall
import io.grpc.ServerCallHandler
import io.grpc.ServerInterceptor
import io.grpc.Status
class BulkheadServerInterceptor(private val bulkhead: Bulkhead) : ServerInterceptor {
override fun <ReqT, RespT> interceptCall(
call: ServerCall<ReqT, RespT>,
headers: Metadata,
next: ServerCallHandler<ReqT, RespT>
): ServerCall.Listener<ReqT> =
if (bulkhead.isCallPermitted) {
val listener = next.startCall(call, headers)
object : ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(listener) {
override fun onCancel() {
bulkhead.onComplete()
super.onCancel()
}
override fun onComplete() {
bulkhead.onComplete()
super.onComplete()
}
}
} else {
call.close(
Status.UNAVAILABLE
.withDescription("Bulkhead '${bulkhead.name}' is full"),
Metadata()
)
object : ServerCall.Listener<ReqT>() {}
}
}
import io.github.resilience4j.circuitbreaker.CircuitBreaker
import io.github.resilience4j.circuitbreaker.utils.CircuitBreakerUtils
import io.grpc.CallOptions
import io.grpc.Channel
import io.grpc.ClientCall
import io.grpc.ClientInterceptor
import io.grpc.ClientInterceptors
import io.grpc.ForwardingClientCallListener
import io.grpc.Metadata
import io.grpc.MethodDescriptor
import io.grpc.Status
import io.grpc.StatusRuntimeException
class CircuitBreakerClientInterceptor(private val circuitBreaker: CircuitBreaker) : ClientInterceptor {
override fun <ReqT, RespT> interceptCall(
method: MethodDescriptor<ReqT, RespT>,
callOptions: CallOptions,
next: Channel
): ClientCall<ReqT, RespT> =
object : ClientInterceptors.CheckedForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
override fun checkedStart(responseListener: ClientCall.Listener<RespT>, headers: Metadata) {
CircuitBreakerUtils.isCallPermitted(circuitBreaker)
delegate().start(Listener(responseListener, System.nanoTime()), headers)
}
}
private inner class Listener<RespT>(delegate: ClientCall.Listener<RespT>, private val startedAt: Long) :
ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(delegate) {
override fun onClose(status: Status, trailers: Metadata) {
val elapsed = System.nanoTime() - startedAt
if (status.isOk) {
circuitBreaker.onSuccess(elapsed)
} else {
circuitBreaker.onError(elapsed, StatusRuntimeException(status, trailers))
}
super.onClose(status, trailers)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment