Skip to content

Instantly share code, notes, and snippets.

@lfmunoz
Last active September 30, 2019 19:04
Show Gist options
  • Save lfmunoz/bb9e78d7bc2a03b2df7a07579c438d3d to your computer and use it in GitHub Desktop.
Save lfmunoz/bb9e78d7bc2a03b2df7a07579c438d3d to your computer and use it in GitHub Desktop.
RabbitMQ
import com.fasterxml.jackson.databind.ObjectMapper
import org.fissore.slf4j.FluentLoggerFactory
import org.springframework.amqp.rabbit.listener.ListenerContainerConsumerFailedEvent
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.context.ApplicationListener
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import java.util.concurrent.ExecutorService
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit
import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.springframework.amqp.rabbit.connection.ConnectionFactory
import org.springframework.amqp.rabbit.core.RabbitTemplate
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter
import org.springframework.amqp.support.converter.MessageConverter
/**
* Configuration
*/
@Configuration
class GlobalConfig {
companion object {
private val Log = FluentLoggerFactory.getLogger(GlobalConfig::class.java)
}
@Autowired
lateinit var objectMapper: ObjectMapper
@Bean
fun consumerFailedListener(): ApplicationListener<ListenerContainerConsumerFailedEvent> {
return ApplicationListener {
Log.error().withCause(it.throwable) .log("Rabbit consumer {}failure from {}: {}",
if (it.isFatal) "fatal " else "",
it.source,
it.reason)
}
}
// This executor is configured in the CachingConnectionFactory, passed into the RabbitMQ Client
// when creating the connection, and its threads are used to deliver new messages to the listener container
@Bean(destroyMethod = "shutdown")
fun connectionExecutorService(): ExecutorService {
val threadFactory = ThreadFactoryBuilder()
.setNameFormat("rabbitConn-%s").setDaemon(true).build()
return ThreadPoolExecutor(
1,
4,
60L,
TimeUnit.SECONDS,
LinkedBlockingQueue(),
threadFactory,
ThreadPoolExecutor.CallerRunsPolicy()
)
}
@Bean
fun proxyMessageConverter(): MessageConverter {
return Jackson2JsonMessageConverter(this.objectMapper)
}
}
import java.io.IOException
import java.util.concurrent.ExecutorService
import org.fissore.slf4j.FluentLoggerFactory
import org.springframework.beans.BeansException
import org.springframework.beans.factory.ObjectProvider
import org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration
import org.springframework.boot.autoconfigure.amqp.RabbitProperties
import io.micrometer.core.instrument.Metrics
import com.rabbitmq.client.BlockedListener
import eco.paragon.MainApp.config.MetricsConfig.Companion.rabbitBlockedName
import eco.paragon.MainApp.config.MetricsConfig.Companion.rabbitConnClosedName
import eco.paragon.MainApp.config.MetricsConfig.Companion.rabbitConnCreatedName
import eco.paragon.MainApp.config.MetricsConfig.Companion.rabbitUnblockedName
import org.springframework.amqp.rabbit.connection.*
/**
* Creates a customized CachingConnectionFactory
*/
class OurRabbitConnectorFactoryCreator : RabbitAutoConfiguration() {
companion object {
private val Log = FluentLoggerFactory.getLogger(OurRabbitConnectorFactoryCreator::class.java)
fun newConnectionFactory(
name: String,
config: RabbitProperties,
executor: ExecutorService
): CachingConnectionFactory {
return MyRabbitConnectionFactoryCreator().rabbitConnectionFactory(name, config, executor)
}
}
private class MyRabbitConnectionFactoryCreator : RabbitConnectionFactoryCreator() {
fun rabbitConnectionFactory(
name: String, config:
RabbitProperties,
executor: ExecutorService
): CachingConnectionFactory {
val rabbitCreated = Metrics.counter(rabbitConnClosedName, "name", name)
val rabbitClosed = Metrics.counter(rabbitConnCreatedName, "name", name)
val rabbitBlocked = Metrics.counter(rabbitBlockedName, "name", name)
val rabbitUnblocked = Metrics.counter(rabbitUnblockedName, "name", name)
try {
val strategy = ConnectionNameStrategy { name }
val factory = super.rabbitConnectionFactory(config, object : ObjectProvider<ConnectionNameStrategy> {
@Throws(BeansException::class)
override fun getObject(vararg args: Any): ConnectionNameStrategy {
return strategy
}
@Throws(BeansException::class)
override fun getIfAvailable(): ConnectionNameStrategy? {
return strategy
}
@Throws(BeansException::class)
override fun getIfUnique(): ConnectionNameStrategy? {
return strategy
}
@Throws(BeansException::class)
override fun getObject(): ConnectionNameStrategy {
return strategy
}
})
factory.addConnectionListener(object : ConnectionListener {
override fun onCreate(connection: Connection) {
Log.info().log("[{}] - Rabbit connection has been created {}",
name, connection.localPort.toString())
rabbitCreated.increment()
connection.addBlockedListener(object : BlockedListener {
@Throws(IOException::class) override fun handleBlocked(reason: String) {
Log.info().log("Rabbit Producer {} has been blocked {}",
connection.localPort.toString(), reason)
rabbitBlocked.increment()
}
@Throws(IOException::class) override fun handleUnblocked() {
Log.info().log("Rabbit Producer {} has been unblocked {}",
connection.localPort.toString())
rabbitUnblocked.increment()
}
})
}
override fun onClose(connection: Connection) {
Log.info().log("[{}] - Rabbit connection has been closed {}",
name, connection.localPort.toString())
rabbitClosed.increment()
}
})
factory.setExecutor(executor)
return factory
} catch (ex: Exception) {
throw RuntimeException(ex)
}
}
}
} // EOF
import eco.paragon.MainApp.PUBLISHER_EXCHANGE
import eco.paragon.MainApp.rabbitmq.OurRabbitConnectorFactoryCreator.Companion.newConnectionFactory
import org.springframework.amqp.core.Exchange
import org.springframework.amqp.core.FanoutExchange
import org.springframework.amqp.rabbit.AsyncRabbitTemplate
import org.springframework.amqp.rabbit.core.RabbitTemplate
import org.springframework.amqp.support.converter.MessageConverter
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.boot.autoconfigure.amqp.RabbitProperties
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import java.util.concurrent.ExecutorService
/**
* Configuration
*/
@Configuration
class PublisherConfig {
@Qualifier("instanceId")
@Autowired
lateinit var instanceId: String
@Autowired
lateinit var connectionExecutorService : ExecutorService
@Autowired
lateinit var proxyMessageConverter : MessageConverter
@Bean
fun publishExchange(): Exchange {
return FanoutExchange(PUBLISHER_EXCHANGE, false, false)
}
@Bean
fun publishTemplate(): RabbitTemplate {
val properties = RabbitProperties()
val connection = newConnectionFactory(
this.instanceId + "#basic-publish",
properties,
connectionExecutorService
)
val rabbitTemplate = RabbitTemplate(connection)
rabbitTemplate.messageConverter = proxyMessageConverter
rabbitTemplate.setExchange("rabbit-publish-exchange")
return rabbitTemplate
}
// If you can avoid using Spring Boot features. Here ChannelPublisher is independent of Spring Boot.
@Bean
fun channelPublisher(): ChannelPublisher {
return ChannelPublisher(publishTemplate())
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment