Skip to content

Instantly share code, notes, and snippets.

@Zhambul
Created July 24, 2019 19:14
Show Gist options
  • Save Zhambul/5247e52d22b4c5b81b0dffa027c2d1c4 to your computer and use it in GitHub Desktop.
Save Zhambul/5247e52d22b4c5b81b0dffa027c2d1c4 to your computer and use it in GitHub Desktop.
class ReactiveRabbitListenerContainerFactory : SimpleRabbitListenerContainerFactory() {
override fun createListenerContainer(endpoint: RabbitListenerEndpoint?): SimpleMessageListenerContainer {
if (endpoint is MethodRabbitListenerEndpoint) {
val messageHandlerMethodFactory = getField<MessageHandlerMethodFactory>(endpoint, "messageHandlerMethodFactory")
endpoint.setMessageHandlerMethodFactory(ReactiveMessageHandlerMethodFactory(messageHandlerMethodFactory))
}
return super.createListenerContainer(endpoint)
}
}
class ReactiveMessageHandlerMethodFactory(private val t: MessageHandlerMethodFactory): MessageHandlerMethodFactory by t {
override fun createInvocableHandlerMethod(bean: Any, method: Method): InvocableHandlerMethod {
val orig = t.createInvocableHandlerMethod(bean, method)
return createBlockingWrapper(orig)
}
private fun createBlockingWrapper(orig: InvocableHandlerMethod): BlockingHandlerMethod {
val resolvers = getField<HandlerMethodArgumentResolverComposite>(orig, "resolvers")
val parameterNameDiscoverer = getField<ParameterNameDiscoverer>(orig, "parameterNameDiscoverer")
return BlockingHandlerMethod(orig).apply {
setMessageMethodArgumentResolvers(resolvers)
setParameterNameDiscoverer(parameterNameDiscoverer)
}
}
}
class BlockingHandlerMethod(t: InvocableHandlerMethod) : InvocableHandlerMethod(t) {
override fun doInvoke(vararg args: Any?): Any? {
val result= super.doInvoke(*args)
if (result is Mono<*>) {
result.subscriberContext { ctx ->
ctx.put(MdcWebFilter.REQUEST_ID, UUID.randomUUID().toString())
}.block()
}
return null
}
}
private fun <T:Any> getField(target: Any, fieldName: String): T {
val field = ReflectionUtils.findRequiredField(target.javaClass, fieldName)
field.trySetAccessible()
return field.get(target) as T
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment