Last active
September 30, 2019 19:04
-
-
Save lfmunoz/bb9e78d7bc2a03b2df7a07579c438d3d to your computer and use it in GitHub Desktop.
RabbitMQ
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.fasterxml.jackson.databind.ObjectMapper | |
import org.fissore.slf4j.FluentLoggerFactory | |
import org.springframework.amqp.rabbit.listener.ListenerContainerConsumerFailedEvent | |
import org.springframework.beans.factory.annotation.Autowired | |
import org.springframework.context.ApplicationListener | |
import org.springframework.context.annotation.Bean | |
import org.springframework.context.annotation.Configuration | |
import java.util.concurrent.ExecutorService | |
import java.util.concurrent.LinkedBlockingQueue | |
import java.util.concurrent.ThreadPoolExecutor | |
import java.util.concurrent.TimeUnit | |
import com.google.common.util.concurrent.ThreadFactoryBuilder | |
import org.springframework.amqp.rabbit.connection.ConnectionFactory | |
import org.springframework.amqp.rabbit.core.RabbitTemplate | |
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter | |
import org.springframework.amqp.support.converter.MessageConverter | |
/** | |
* Configuration | |
*/ | |
@Configuration | |
class GlobalConfig { | |
companion object { | |
private val Log = FluentLoggerFactory.getLogger(GlobalConfig::class.java) | |
} | |
@Autowired | |
lateinit var objectMapper: ObjectMapper | |
@Bean | |
fun consumerFailedListener(): ApplicationListener<ListenerContainerConsumerFailedEvent> { | |
return ApplicationListener { | |
Log.error().withCause(it.throwable) .log("Rabbit consumer {}failure from {}: {}", | |
if (it.isFatal) "fatal " else "", | |
it.source, | |
it.reason) | |
} | |
} | |
// This executor is configured in the CachingConnectionFactory, passed into the RabbitMQ Client | |
// when creating the connection, and its threads are used to deliver new messages to the listener container | |
@Bean(destroyMethod = "shutdown") | |
fun connectionExecutorService(): ExecutorService { | |
val threadFactory = ThreadFactoryBuilder() | |
.setNameFormat("rabbitConn-%s").setDaemon(true).build() | |
return ThreadPoolExecutor( | |
1, | |
4, | |
60L, | |
TimeUnit.SECONDS, | |
LinkedBlockingQueue(), | |
threadFactory, | |
ThreadPoolExecutor.CallerRunsPolicy() | |
) | |
} | |
@Bean | |
fun proxyMessageConverter(): MessageConverter { | |
return Jackson2JsonMessageConverter(this.objectMapper) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.io.IOException | |
import java.util.concurrent.ExecutorService | |
import org.fissore.slf4j.FluentLoggerFactory | |
import org.springframework.beans.BeansException | |
import org.springframework.beans.factory.ObjectProvider | |
import org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration | |
import org.springframework.boot.autoconfigure.amqp.RabbitProperties | |
import io.micrometer.core.instrument.Metrics | |
import com.rabbitmq.client.BlockedListener | |
import eco.paragon.MainApp.config.MetricsConfig.Companion.rabbitBlockedName | |
import eco.paragon.MainApp.config.MetricsConfig.Companion.rabbitConnClosedName | |
import eco.paragon.MainApp.config.MetricsConfig.Companion.rabbitConnCreatedName | |
import eco.paragon.MainApp.config.MetricsConfig.Companion.rabbitUnblockedName | |
import org.springframework.amqp.rabbit.connection.* | |
/** | |
* Creates a customized CachingConnectionFactory | |
*/ | |
class OurRabbitConnectorFactoryCreator : RabbitAutoConfiguration() { | |
companion object { | |
private val Log = FluentLoggerFactory.getLogger(OurRabbitConnectorFactoryCreator::class.java) | |
fun newConnectionFactory( | |
name: String, | |
config: RabbitProperties, | |
executor: ExecutorService | |
): CachingConnectionFactory { | |
return MyRabbitConnectionFactoryCreator().rabbitConnectionFactory(name, config, executor) | |
} | |
} | |
private class MyRabbitConnectionFactoryCreator : RabbitConnectionFactoryCreator() { | |
fun rabbitConnectionFactory( | |
name: String, config: | |
RabbitProperties, | |
executor: ExecutorService | |
): CachingConnectionFactory { | |
val rabbitCreated = Metrics.counter(rabbitConnClosedName, "name", name) | |
val rabbitClosed = Metrics.counter(rabbitConnCreatedName, "name", name) | |
val rabbitBlocked = Metrics.counter(rabbitBlockedName, "name", name) | |
val rabbitUnblocked = Metrics.counter(rabbitUnblockedName, "name", name) | |
try { | |
val strategy = ConnectionNameStrategy { name } | |
val factory = super.rabbitConnectionFactory(config, object : ObjectProvider<ConnectionNameStrategy> { | |
@Throws(BeansException::class) | |
override fun getObject(vararg args: Any): ConnectionNameStrategy { | |
return strategy | |
} | |
@Throws(BeansException::class) | |
override fun getIfAvailable(): ConnectionNameStrategy? { | |
return strategy | |
} | |
@Throws(BeansException::class) | |
override fun getIfUnique(): ConnectionNameStrategy? { | |
return strategy | |
} | |
@Throws(BeansException::class) | |
override fun getObject(): ConnectionNameStrategy { | |
return strategy | |
} | |
}) | |
factory.addConnectionListener(object : ConnectionListener { | |
override fun onCreate(connection: Connection) { | |
Log.info().log("[{}] - Rabbit connection has been created {}", | |
name, connection.localPort.toString()) | |
rabbitCreated.increment() | |
connection.addBlockedListener(object : BlockedListener { | |
@Throws(IOException::class) override fun handleBlocked(reason: String) { | |
Log.info().log("Rabbit Producer {} has been blocked {}", | |
connection.localPort.toString(), reason) | |
rabbitBlocked.increment() | |
} | |
@Throws(IOException::class) override fun handleUnblocked() { | |
Log.info().log("Rabbit Producer {} has been unblocked {}", | |
connection.localPort.toString()) | |
rabbitUnblocked.increment() | |
} | |
}) | |
} | |
override fun onClose(connection: Connection) { | |
Log.info().log("[{}] - Rabbit connection has been closed {}", | |
name, connection.localPort.toString()) | |
rabbitClosed.increment() | |
} | |
}) | |
factory.setExecutor(executor) | |
return factory | |
} catch (ex: Exception) { | |
throw RuntimeException(ex) | |
} | |
} | |
} | |
} // EOF |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import eco.paragon.MainApp.PUBLISHER_EXCHANGE | |
import eco.paragon.MainApp.rabbitmq.OurRabbitConnectorFactoryCreator.Companion.newConnectionFactory | |
import org.springframework.amqp.core.Exchange | |
import org.springframework.amqp.core.FanoutExchange | |
import org.springframework.amqp.rabbit.AsyncRabbitTemplate | |
import org.springframework.amqp.rabbit.core.RabbitTemplate | |
import org.springframework.amqp.support.converter.MessageConverter | |
import org.springframework.beans.factory.annotation.Autowired | |
import org.springframework.beans.factory.annotation.Qualifier | |
import org.springframework.boot.autoconfigure.amqp.RabbitProperties | |
import org.springframework.context.annotation.Bean | |
import org.springframework.context.annotation.Configuration | |
import java.util.concurrent.ExecutorService | |
/** | |
* Configuration | |
*/ | |
@Configuration | |
class PublisherConfig { | |
@Qualifier("instanceId") | |
@Autowired | |
lateinit var instanceId: String | |
@Autowired | |
lateinit var connectionExecutorService : ExecutorService | |
@Autowired | |
lateinit var proxyMessageConverter : MessageConverter | |
@Bean | |
fun publishExchange(): Exchange { | |
return FanoutExchange(PUBLISHER_EXCHANGE, false, false) | |
} | |
@Bean | |
fun publishTemplate(): RabbitTemplate { | |
val properties = RabbitProperties() | |
val connection = newConnectionFactory( | |
this.instanceId + "#basic-publish", | |
properties, | |
connectionExecutorService | |
) | |
val rabbitTemplate = RabbitTemplate(connection) | |
rabbitTemplate.messageConverter = proxyMessageConverter | |
rabbitTemplate.setExchange("rabbit-publish-exchange") | |
return rabbitTemplate | |
} | |
// If you can avoid using Spring Boot features. Here ChannelPublisher is independent of Spring Boot. | |
@Bean | |
fun channelPublisher(): ChannelPublisher { | |
return ChannelPublisher(publishTemplate()) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment