Skip to content

Instantly share code, notes, and snippets.

@lfmunoz
Created July 9, 2020 03:10
Show Gist options
  • Save lfmunoz/c7922382a13e2c576ff77b3b9f3f99ae to your computer and use it in GitHub Desktop.
Save lfmunoz/c7922382a13e2c576ff77b3b9f3f99ae to your computer and use it in GitHub Desktop.
Apache Flink Rabbit Consumer
import com.google.common.io.ByteStreams
import com.rabbitmq.client.*
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.slf4j.LoggerFactory
import java.io.IOException
import java.util.concurrent.CountDownLatch
/**
* https://www.rabbitmq.com/java-client.html
* https://www.rabbitmq.com/api-guide.html
* Rabbit Consumer Bare Implementation
*/
class RabbitConsumerBare(
private val consumerTag: String, // unique identifier of consumer
private val config_uri: String,
private val queue: RabbitQueueConfig,
private val exchange: RabbitExchangeConfig,
private val prefetchSize: Int = 10
) {
companion object {
private val log = LoggerFactory.getLogger(RabbitConsumerBare::class.java)
private const val queueIsExclusive: Boolean = false
// Required to be false for QoS, we will manually acknowledge batches of messages
private const val autoAck: Boolean = false
@Throws(IOException::class)
fun decompress(compression: Compression?, data: ByteArray?): ByteArray? {
return ByteStreams.toByteArray(CompressionUtil.wrap(compression, FastByteArrayInputStream(data)))
}
}
private val factory: ConnectionFactory
private var connection: Connection
private val channel: Channel
init {
log.info("[$consumerTag] - connect to RabbitMQ - uri=$config_uri")
factory = ConnectionFactory().apply {
setUri(config_uri)
// Attempt recovery every 5 seconds
isAutomaticRecoveryEnabled = true
}
connection = factory.newConnection()
channel = connection.createChannel()
createQueue()
// Accepts prefetchSize unack-ed message at a time
channel.basicQos(prefetchSize)
}
// ________________________________________________________________________________
// PUBLIC
// ________________________________________________________________________________
fun consumeChannelWithCallback(ctx: SourceFunction.SourceContext<ByteArray>) {
Thread {
val latch = CountDownLatch(1)
channel.basicConsume(queue.name, autoAck, consumerTag, object : DefaultConsumer(channel) {
override fun handleDelivery(
consumerTag: String?,
envelope: Envelope,
properties: AMQP.BasicProperties,
body: ByteArray?
) {
val deliveryTag: Long = envelope.deliveryTag
//decompress if it the message is compressed
body?.let {
val compression = Compression.of(properties.contentEncoding)
if (compression != Compression.NONE) {
ctx.collect(decompress(compression, it))
} else {
ctx.collect(it)
}
}
// We acknowledge each message individually (can be modified to ack batches)
channel.basicAck(deliveryTag, false)
}
override fun handleShutdownSignal(consumerTag: String, sig: ShutdownSignalException) {
log.warn("[$consumerTag] - shutdown signal: {}", sig)
latch.countDown()
}
})
latch.await()
}.run()
}
fun close() {
channel.close()
connection.close()
}
// ________________________________________________________________________________
// PRIVATE
// ________________________________________________________________________________
private fun createQueue() {
val queueArgs: Map<String, Any> = mutableMapOf()
if (queue.messageTtl != 0) {
queueArgs.plus("x-message-ttl" to queue.messageTtl)
}
if (queue.maxLength != 0) {
queueArgs.plus("x-max-length" to queue.maxLength)
}
channel.exchangeDeclare(exchange.name, exchange.type, exchange.durable)
val queueOk = channel.queueDeclare(queue.name, queue.durable, queueIsExclusive, queue.autoDelete, queueArgs)
channel.queueBind(queueOk.queue, exchange.name, "")
log.info("[queue created] - queue=${queue}")
}
} // end of RabbitConsumerBare
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment