Skip to content

Instantly share code, notes, and snippets.

@lfmunoz
Created July 13, 2020 19:00
Show Gist options
  • Save lfmunoz/7a8367d99c966f95174a7676ae4ec50b to your computer and use it in GitHub Desktop.
Save lfmunoz/7a8367d99c966f95174a7676ae4ec50b to your computer and use it in GitHub Desktop.
Rabbit Publisher
package eco.analytics.rabbit
import com.rabbitmq.client.*
import io.vertx.core.json.JsonObject
import io.vertx.rabbitmq.RabbitMQOptions
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.callbackFlow
import kotlinx.coroutines.flow.flow
import org.fissore.slf4j.FluentLoggerFactory
import java.io.IOException
import java.util.concurrent.atomic.AtomicBoolean
import javax.annotation.concurrent.NotThreadSafe
/**
* https://www.rabbitmq.com/java-client.html
* https://www.rabbitmq.com/api-guide.html
*
* Publisher Confirms:
* https://www.rabbitmq.com/tutorials/tutorial-seven-java.html
*/
class RabbitPublisherBare(
private val config_uri: String
) {
private val log = FluentLoggerFactory.getLogger(RabbitPublisherBare::class.java)
lateinit var exchangeName: String
lateinit var uri: String
val connected : AtomicBoolean = AtomicBoolean(false)
private val factory: ConnectionFactory = ConnectionFactory()
private val connection: Connection
private var aChannel: Channel? = null // Channel is NOT thread safe
// ________________________________________________________________________________
// CONSTRUCTOR
// ________________________________________________________________________________
init {
factory.setUri(config_uri)
connection = factory.newConnection()
}
// ________________________________________________________________________________
// PUBLIC
// ________________________________________________________________________________
suspend fun connect(config_exchange: String) : Boolean {
log.info().log("connecting to $config_uri $config_exchange ")
exchangeName = config_exchange
uri = config_uri
aChannel = connection.createChannel()
createExchange(aChannel, exchangeName)
return true
} // end of connect
fun publish(aByteArray: ByteArray) {
aChannel?.basicPublish(exchangeName, "", byteArrayRabbitMessage, aByteArray) ?: throw RuntimeException("[basicPublish] - channel is null")
log.debug().log("[basicPublish] - sent bytearray of length ${aByteArray.size}")
}
fun publish(json: JsonObject) {
aChannel?.basicPublish(exchangeName, "", MessageProperties.TEXT_PLAIN , json.toBuffer().bytes) ?: throw RuntimeException("[basicPublish] - channel is null")
log.debug().log("[basicPublish] - sent $json")
}
private val byteArrayRabbitMessage : AMQP.BasicProperties =
AMQP.BasicProperties.Builder()
.contentType("application/octet-stream")
.deliveryMode(1) // nonpersistent
.priority(1)
.appId("eco-analytics-bridge")
.build()
private fun buildTextRabbitMessage() : AMQP.BasicProperties{
return AMQP.BasicProperties.Builder()
.contentType("text/plain")
.deliveryMode(1)
.priority(1)
.appId("eco-analytics-bridge")
.build()
}
fun close() {
connection.close();
}
// ________________________________________________________________________________
// PRIVATE
// ________________________________________________________________________________
private fun createExchange(aChannel: Channel?, exchangeName: String) {
aChannel?.exchangeDeclare(exchangeName, "fanout", false) ?: throw RuntimeException("[exchangeDeclare] - channel is null")
log.info().log("[exchange created] - exchange=$exchangeName")
}
}
@lfmunoz
Copy link
Author

lfmunoz commented Aug 5, 2020

import java.io.Serializable
//________________________________________________________________________________
// RABBIT CONFIG
//________________________________________________________________________________
data class RabbitConfig(
        var amqp: String = "",
        var exchange: RabbitExchangeConfig,
        var queue: RabbitQueueConfig,
        var bufferSize: Int = 100,
        var compression: String = "NONE"
) : Serializable {
    constructor(params: ParameterTool) : this(
            params.get("amqp", "amqp://guest:guest@localhost:5672"),
            RabbitExchangeConfig(
                    params.get("exchange", "flink-rabbit-exchange"),
                    params.get("exchangeType", "fanout"),
                    params.getBoolean("exchangeDurable", true),
                    params.getBoolean("exchangeAutoDelete", false)
            ),
            RabbitQueueConfig(
                    params.get("queue", "flink-rabbit-consumer"),
                    params.getBoolean("queueDurable", false),
                    params.getBoolean("queueAutoDelete", false),
                    params.getInt("messageTtl", 60_000),
                    params.getInt("maxLength", 100_000)
            ),
            params.getInt("bufferSize", 100),
            params.get("rabbitCompressionType", "NONE")
    )
}

data class RabbitQueueConfig(
        var name: String = "",
        var durable: Boolean = false,
        var autoDelete: Boolean = false,
        var messageTtl: Int = 60_000,
        var maxLength: Int = 100_000
) : Serializable

data class RabbitExchangeConfig(
        var name: String = "",
        var type: String = "fanout",
        var durable: Boolean = false,
        var autoDelete: Boolean = false
) : Serializable

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment