Skip to content

Instantly share code, notes, and snippets.

@ept
Created July 15, 2009 10:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ept/147625 to your computer and use it in GitHub Desktop.
Save ept/147625 to your computer and use it in GitHub Desktop.
import java.util.logging.Logger
import com.rabbitmq.client.{AlreadyClosedException, Channel, Connection, ConnectionFactory,
ConnectionParameters, QueueingConsumer, ShutdownSignalException, AMQP}
object RabbitExample {
case class Config(host: String, port: Int)
val config = Config("localhost", 5672)
private val log = Logger.getLogger(this.getClass.getName)
private var connectionFactory: ConnectionFactory = null
private var connection: Connection = null
private var channel: Channel = null
/**
* Creates a new AMQP server connection.
*/
private def createRabbitConnection(config: Config) = {
if (connectionFactory == null) {
val params = new ConnectionParameters
params.setRequestedHeartbeat(0)
connectionFactory = new ConnectionFactory(params)
}
connectionFactory.newConnection(config.host, config.port)
}
/**
* Sends a message, serialized to a string, to a given queue on an AMQP network.
*/
def sendMessage(queueName: String, properties: AMQP.BasicProperties, msg: Array[Byte]) {
// Connection loop (reconnect if AMQP server connection goes down)
while (
try {
// Try connecting to server if necessary
if (connection == null) {
log.info("Producer connection to AMQP server on " + config.host + ":" + config.port)
connection = createRabbitConnection(config)
channel = connection.createChannel
}
// Send the message
channel.basicPublish("", queueName, properties, msg)
false // success -- don't repeat while loop
} catch {
case e: Exception =>
log.warning(e.getStackTrace.foldLeft("AMQP connection closed (%s) - reconnecting in 3 seconds" format(e.toString)) {
(a, b) => "%s\n%s" format(a, b.toString)
})
connection = null
true // failure -- repeat connection attempt
}
) {
try {
Thread.sleep(3000) // avoid spinning in a tight loop if the server is unavailable
} catch {
case e: InterruptedException =>
}
}
}
}
/**
Works for a while, then prints the following stack trace:
WARNING: com.rabbitmq.client.AlreadyClosedException (clean connection shutdown; reason: Attempt to use closed channel)
com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:189)
com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:284)
com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:392)
com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:376)
... our own code ...
*/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment