Created
July 9, 2020 03:10
-
-
Save lfmunoz/c7922382a13e2c576ff77b3b9f3f99ae to your computer and use it in GitHub Desktop.
Apache Flink Rabbit Consumer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.google.common.io.ByteStreams | |
import com.rabbitmq.client.* | |
import org.apache.flink.configuration.Configuration | |
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction | |
import org.apache.flink.streaming.api.functions.source.SourceFunction | |
import org.slf4j.LoggerFactory | |
import java.io.IOException | |
import java.util.concurrent.CountDownLatch | |
/** | |
* https://www.rabbitmq.com/java-client.html | |
* https://www.rabbitmq.com/api-guide.html | |
* Rabbit Consumer Bare Implementation | |
*/ | |
class RabbitConsumerBare( | |
private val consumerTag: String, // unique identifier of consumer | |
private val config_uri: String, | |
private val queue: RabbitQueueConfig, | |
private val exchange: RabbitExchangeConfig, | |
private val prefetchSize: Int = 10 | |
) { | |
companion object { | |
private val log = LoggerFactory.getLogger(RabbitConsumerBare::class.java) | |
private const val queueIsExclusive: Boolean = false | |
// Required to be false for QoS, we will manually acknowledge batches of messages | |
private const val autoAck: Boolean = false | |
@Throws(IOException::class) | |
fun decompress(compression: Compression?, data: ByteArray?): ByteArray? { | |
return ByteStreams.toByteArray(CompressionUtil.wrap(compression, FastByteArrayInputStream(data))) | |
} | |
} | |
private val factory: ConnectionFactory | |
private var connection: Connection | |
private val channel: Channel | |
init { | |
log.info("[$consumerTag] - connect to RabbitMQ - uri=$config_uri") | |
factory = ConnectionFactory().apply { | |
setUri(config_uri) | |
// Attempt recovery every 5 seconds | |
isAutomaticRecoveryEnabled = true | |
} | |
connection = factory.newConnection() | |
channel = connection.createChannel() | |
createQueue() | |
// Accepts prefetchSize unack-ed message at a time | |
channel.basicQos(prefetchSize) | |
} | |
// ________________________________________________________________________________ | |
// PUBLIC | |
// ________________________________________________________________________________ | |
fun consumeChannelWithCallback(ctx: SourceFunction.SourceContext<ByteArray>) { | |
Thread { | |
val latch = CountDownLatch(1) | |
channel.basicConsume(queue.name, autoAck, consumerTag, object : DefaultConsumer(channel) { | |
override fun handleDelivery( | |
consumerTag: String?, | |
envelope: Envelope, | |
properties: AMQP.BasicProperties, | |
body: ByteArray? | |
) { | |
val deliveryTag: Long = envelope.deliveryTag | |
//decompress if it the message is compressed | |
body?.let { | |
val compression = Compression.of(properties.contentEncoding) | |
if (compression != Compression.NONE) { | |
ctx.collect(decompress(compression, it)) | |
} else { | |
ctx.collect(it) | |
} | |
} | |
// We acknowledge each message individually (can be modified to ack batches) | |
channel.basicAck(deliveryTag, false) | |
} | |
override fun handleShutdownSignal(consumerTag: String, sig: ShutdownSignalException) { | |
log.warn("[$consumerTag] - shutdown signal: {}", sig) | |
latch.countDown() | |
} | |
}) | |
latch.await() | |
}.run() | |
} | |
fun close() { | |
channel.close() | |
connection.close() | |
} | |
// ________________________________________________________________________________ | |
// PRIVATE | |
// ________________________________________________________________________________ | |
private fun createQueue() { | |
val queueArgs: Map<String, Any> = mutableMapOf() | |
if (queue.messageTtl != 0) { | |
queueArgs.plus("x-message-ttl" to queue.messageTtl) | |
} | |
if (queue.maxLength != 0) { | |
queueArgs.plus("x-max-length" to queue.maxLength) | |
} | |
channel.exchangeDeclare(exchange.name, exchange.type, exchange.durable) | |
val queueOk = channel.queueDeclare(queue.name, queue.durable, queueIsExclusive, queue.autoDelete, queueArgs) | |
channel.queueBind(queueOk.queue, exchange.name, "") | |
log.info("[queue created] - queue=${queue}") | |
} | |
} // end of RabbitConsumerBare | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment