resilience4j-grpc
import io.github.resilience4j.bulkhead.Bulkhead | |
import io.github.resilience4j.bulkhead.utils.BulkheadUtils | |
import io.grpc.CallOptions | |
import io.grpc.Channel | |
import io.grpc.ClientCall | |
import io.grpc.ClientInterceptor | |
import io.grpc.ClientInterceptors | |
import io.grpc.ForwardingClientCallListener | |
import io.grpc.Metadata | |
import io.grpc.MethodDescriptor | |
import io.grpc.Status | |
class BulkheadClientInterceptor(private val bulkhead: Bulkhead) : ClientInterceptor { | |
override fun <ReqT, RespT> interceptCall( | |
method: MethodDescriptor<ReqT, RespT>, | |
callOptions: CallOptions, | |
next: Channel | |
): ClientCall<ReqT, RespT> = | |
object : ClientInterceptors.CheckedForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) { | |
override fun checkedStart(responseListener: ClientCall.Listener<RespT>, headers: Metadata) { | |
BulkheadUtils.isCallPermitted(bulkhead) | |
delegate().start(Listener(responseListener), headers) | |
} | |
} | |
private inner class Listener<RespT>(delegate: ClientCall.Listener<RespT>) : | |
ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(delegate) { | |
override fun onClose(status: Status, trailers: Metadata) { | |
bulkhead.onComplete() | |
super.onClose(status, trailers) | |
} | |
} | |
} |
import io.github.resilience4j.bulkhead.Bulkhead | |
import io.grpc.ForwardingServerCallListener | |
import io.grpc.Metadata | |
import io.grpc.ServerCall | |
import io.grpc.ServerCallHandler | |
import io.grpc.ServerInterceptor | |
import io.grpc.Status | |
class BulkheadServerInterceptor(private val bulkhead: Bulkhead) : ServerInterceptor { | |
override fun <ReqT, RespT> interceptCall( | |
call: ServerCall<ReqT, RespT>, | |
headers: Metadata, | |
next: ServerCallHandler<ReqT, RespT> | |
): ServerCall.Listener<ReqT> = | |
if (bulkhead.isCallPermitted) { | |
val listener = next.startCall(call, headers) | |
object : ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(listener) { | |
override fun onCancel() { | |
bulkhead.onComplete() | |
super.onCancel() | |
} | |
override fun onComplete() { | |
bulkhead.onComplete() | |
super.onComplete() | |
} | |
} | |
} else { | |
call.close( | |
Status.UNAVAILABLE | |
.withDescription("Bulkhead '${bulkhead.name}' is full"), | |
Metadata() | |
) | |
object : ServerCall.Listener<ReqT>() {} | |
} | |
} |
import io.github.resilience4j.circuitbreaker.CircuitBreaker | |
import io.github.resilience4j.circuitbreaker.utils.CircuitBreakerUtils | |
import io.grpc.CallOptions | |
import io.grpc.Channel | |
import io.grpc.ClientCall | |
import io.grpc.ClientInterceptor | |
import io.grpc.ClientInterceptors | |
import io.grpc.ForwardingClientCallListener | |
import io.grpc.Metadata | |
import io.grpc.MethodDescriptor | |
import io.grpc.Status | |
import io.grpc.StatusRuntimeException | |
class CircuitBreakerClientInterceptor(private val circuitBreaker: CircuitBreaker) : ClientInterceptor { | |
override fun <ReqT, RespT> interceptCall( | |
method: MethodDescriptor<ReqT, RespT>, | |
callOptions: CallOptions, | |
next: Channel | |
): ClientCall<ReqT, RespT> = | |
object : ClientInterceptors.CheckedForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) { | |
override fun checkedStart(responseListener: ClientCall.Listener<RespT>, headers: Metadata) { | |
CircuitBreakerUtils.isCallPermitted(circuitBreaker) | |
delegate().start(Listener(responseListener, System.nanoTime()), headers) | |
} | |
} | |
private inner class Listener<RespT>(delegate: ClientCall.Listener<RespT>, private val startedAt: Long) : | |
ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(delegate) { | |
override fun onClose(status: Status, trailers: Metadata) { | |
val elapsed = System.nanoTime() - startedAt | |
if (status.isOk) { | |
circuitBreaker.onSuccess(elapsed) | |
} else { | |
circuitBreaker.onError(elapsed, StatusRuntimeException(status, trailers)) | |
} | |
super.onClose(status, trailers) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment