Created
July 15, 2009 10:17
-
-
Save ept/147625 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.logging.Logger | |
import com.rabbitmq.client.{AlreadyClosedException, Channel, Connection, ConnectionFactory, | |
ConnectionParameters, QueueingConsumer, ShutdownSignalException, AMQP} | |
object RabbitExample { | |
case class Config(host: String, port: Int) | |
val config = Config("localhost", 5672) | |
private val log = Logger.getLogger(this.getClass.getName) | |
private var connectionFactory: ConnectionFactory = null | |
private var connection: Connection = null | |
private var channel: Channel = null | |
/** | |
* Creates a new AMQP server connection. | |
*/ | |
private def createRabbitConnection(config: Config) = { | |
if (connectionFactory == null) { | |
val params = new ConnectionParameters | |
params.setRequestedHeartbeat(0) | |
connectionFactory = new ConnectionFactory(params) | |
} | |
connectionFactory.newConnection(config.host, config.port) | |
} | |
/** | |
* Sends a message, serialized to a string, to a given queue on an AMQP network. | |
*/ | |
def sendMessage(queueName: String, properties: AMQP.BasicProperties, msg: Array[Byte]) { | |
// Connection loop (reconnect if AMQP server connection goes down) | |
while ( | |
try { | |
// Try connecting to server if necessary | |
if (connection == null) { | |
log.info("Producer connection to AMQP server on " + config.host + ":" + config.port) | |
connection = createRabbitConnection(config) | |
channel = connection.createChannel | |
} | |
// Send the message | |
channel.basicPublish("", queueName, properties, msg) | |
false // success -- don't repeat while loop | |
} catch { | |
case e: Exception => | |
log.warning(e.getStackTrace.foldLeft("AMQP connection closed (%s) - reconnecting in 3 seconds" format(e.toString)) { | |
(a, b) => "%s\n%s" format(a, b.toString) | |
}) | |
connection = null | |
true // failure -- repeat connection attempt | |
} | |
) { | |
try { | |
Thread.sleep(3000) // avoid spinning in a tight loop if the server is unavailable | |
} catch { | |
case e: InterruptedException => | |
} | |
} | |
} | |
} | |
/** | |
Works for a while, then prints the following stack trace: | |
WARNING: com.rabbitmq.client.AlreadyClosedException (clean connection shutdown; reason: Attempt to use closed channel) | |
com.rabbitmq.client.impl.AMQChannel.ensureIsOpen(AMQChannel.java:189) | |
com.rabbitmq.client.impl.AMQChannel.transmit(AMQChannel.java:284) | |
com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:392) | |
com.rabbitmq.client.impl.ChannelN.basicPublish(ChannelN.java:376) | |
... our own code ... | |
*/ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment