Skip to content

Instantly share code, notes, and snippets.

@dacr
Last active April 2, 2023 10:14
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dacr/2be9cbe9eb38c8a893008a9197cf20e2 to your computer and use it in GitHub Desktop.
Save dacr/2be9cbe9eb38c8a893008a9197cf20e2 to your computer and use it in GitHub Desktop.
learning rabbitmq through java api. / published by https://github.com/dacr/code-examples-manager #93f22a3a-e4e4-4f16-96a0-b78f3d7ae916/3aaab1e7b068aa9eff811df539670fd7c745592
// summary : learning rabbitmq through java api.
// keywords : scala, scalatest, fixture, rabbitmq, rabbitmq-java-api, learning, docker, async, @testable
// publish : gist
// authors : David Crosson
// license : Apache NON-AI License Version 2.0 (https://raw.githubusercontent.com/non-ai-licenses/non-ai-licenses/main/NON-AI-APACHE2)
// id : 93f22a3a-e4e4-4f16-96a0-b78f3d7ae916
// created-on : 2020-08-14T20:16:33Z
// managed-by : https://github.com/dacr/code-examples-manager
// execution : scala ammonite script (http://ammonite.io/) - run as follow 'amm scriptname.sc'
// Inspired from https://www.rabbitmq.com/api-guide.html
import $ivy.`org.scalatest::scalatest:3.2.6`
import $ivy.`com.whisk::docker-testkit-impl-spotify:0.9.9`
import $ivy.`org.json4s::json4s-jackson:3.6.10`
import $ivy.`com.rabbitmq:amqp-client:5.9.0`
import $ivy.`org.slf4j:slf4j-nop:1.7.30`
import $ivy.`javax.activation:activation:1.1.1`
import org.scalatest._
import flatspec._
import matchers._
import OptionValues._
import com.whisk.docker._
import org.json4s.JValue
import org.json4s.jackson.Serialization
import com.rabbitmq.client._
import scala.concurrent.{Future, Promise}
import scala.concurrent.duration._
import scala.util.{Failure, Success, Try, Using}
// ==================================================================================================
// Provided DockerTestKit allow to have control over the used scalatest release
trait DockerTestKit extends BeforeAndAfterAll with org.scalatest.concurrent.ScalaFutures with DockerKit {
self: Suite =>
import org.scalatest.time.{Span, Seconds, Millis}
def dockerInitPatienceInterval =
PatienceConfig(scaled(Span(20, Seconds)), scaled(Span(10, Millis)))
def dockerPullImagesPatienceInterval =
PatienceConfig(scaled(Span(1200, Seconds)), scaled(Span(250, Millis)))
override def beforeAll(): Unit = {
super.beforeAll();
startAllOrFail()
}
override def afterAll(): Unit = {
stopAllQuietly();
super.afterAll()
}
}
// ==================================================================================================
// docker run -it --rm -p 5672:5672 -e "RABBITMQ_DEFAULT_USER=rabbit" -e "RABBITMQ_DEFAULT_PASS=bunny" -e "RABBITMQ_DEFAULT_VHOST=my-vhost" rabbitmq:3.8.6
trait DockerRabbitService extends com.whisk.docker.impl.spotify.DockerKitSpotify {
val rabbitPort = 5672
val rabbitUser = "rabbit"
val rabbitPass = "bunny"
val rabbitVirtualHost = "my-vhost"
val rabbitHostname = "my-rabbit"
val env = Array(
s"RABBITMQ_DEFAULT_USER=$rabbitUser",
s"RABBITMQ_DEFAULT_PASS=$rabbitPass",
s"RABBITMQ_DEFAULT_VHOST=$rabbitVirtualHost",
//"RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS=-rabbit auth_mechanisms ['PLAIN', 'AMQPLAIN']",
)
lazy val rabbitContainer = DockerContainer("rabbitmq:3.8.6")
.withPortMapping(5672 -> DockerPortMapping(Some(rabbitPort), "127.0.0.1"))
.withHostname(rabbitHostname)
.withEnv(env: _*) // take care .withEnv overwrites any previous .withEnv ... in the current release
.withReadyChecker(DockerReadyChecker.LogLineContains("Server startup complete"))
override def dockerContainers: List[DockerContainer] = rabbitContainer :: Nil
}
// ==================================================================================================
class LearningRabbitmqThroughTests extends FixtureAsyncFlatSpec with should.Matchers with DockerRabbitService with DockerTestKit {
override def suiteName = "LearningRabbitmqThroughTests"
implicit val serialization = org.json4s.jackson.Serialization
implicit val formats = org.json4s.DefaultFormats
implicit val ec = scala.concurrent.ExecutionContext.Implicits.global
lazy val connectionFactory = {
val factory = new ConnectionFactory()
factory.setUsername(rabbitUser)
factory.setPassword(rabbitPass)
factory.setVirtualHost(rabbitVirtualHost)
factory.setHost("localhost")
factory.setPort(rabbitPort)
factory
}
case class FixtureParam(createChannel: () => Channel)
def withFixture(test: OneArgAsyncTest) = {
val connection = connectionFactory.newConnection()
var channels = List.empty[Channel]
val channelFactory = () => {
val channel = connection.createChannel
channels ::= channel
channel
}
val theFixture = FixtureParam(channelFactory)
val futureOutcome = withFixture(test.toNoArgAsyncTest(theFixture)) // "loan" the fixture to the test
futureOutcome.onCompletedThen { _ =>
channels.foreach(_.close())
connection.close()
}
futureOutcome
}
// --------------------------------------------------------------------------------------------------
"rabbitmq" should "be ready" in { fixture =>
info("Take care rabbitmq channel are not thread safe")
val channel = fixture.createChannel()
channel.isOpen shouldBe true
}
// --------------------------------------------------------------------------------------------------
it should "be possible to create a default server-named queue" in { fixture =>
note("queueDeclare() creates a server-named exclusive, autodelete, non-durable queue")
note("This queue can only by used by the current connection")
note("This queue is automatically deleted as soon as it is not longer in use")
note("This queue will be automatically deleted on server stops")
val channel = fixture.createChannel()
val state = channel.queueDeclare()
val queueName = state.getQueue
note(s"A queue named $queueName has been created, it will be automatically deleted on connection close")
succeed
}
// --------------------------------------------------------------------------------------------------
it should "be possible to create/delete a client-named durable queue" in { fixture =>
val channel = fixture.createChannel()
val queueName = "MyQueue"
try {
channel.queueDeclare(queueName, true, false, false, null)
note("durable : true if we are declaring a durable queue (the queue will survive a server restart)")
note("exclusive : true if we are declaring an exclusive queue (restricted to this connection)")
note("autoDelete : true if we are declaring an autodelete queue (server will delete it when no longer in use)")
note(s"A queue named $queueName has been created")
succeed
} finally {
channel.queueDelete(queueName)
}
}
// --------------------------------------------------------------------------------------------------
it should "be possible to do basic publish/consume operations" in { fixture =>
val promise = Promise[String]()
note("queueDeclare dynamically create a queue whose name (see getQueue) is automatically generated")
val consumerChannel = fixture.createChannel()
val queueName = consumerChannel.queueDeclare().getQueue
val consumer: DeliverCallback = (tag, delivery) => promise.success(new String(delivery.getBody))
consumerChannel.basicConsume(queueName, true, consumer, (tag: String) => {})
note("For very basic publishing, the routing key is the queueName, and this is only what you need")
val publisherChannel = fixture.createChannel()
publisherChannel.basicPublish("", queueName, null, "hello".getBytes)
promise.future.map(result => result shouldBe "hello")
}
// --------------------------------------------------------------------------------------------------
it should "be possible to broadcast messages" in { fixture =>
val promiseA = Promise[String]()
val promiseB = Promise[String]()
note("To broadcast just create a dedicated exchange with fanout type and bind queues to it")
note("Here as everything is going through the same connection, we can use default queueDeclare operation")
val adminChannel = fixture.createChannel()
val queueNameA = adminChannel.queueDeclare().getQueue
val queueNameB = adminChannel.queueDeclare().getQueue
adminChannel.exchangeDeclare("broadcast", "fanout")
adminChannel.queueBind(queueNameA, "broadcast", "")
adminChannel.queueBind(queueNameB, "broadcast", "")
note("java client rabbitmq channels only requires names for queues or exchanges")
val consumer: Promise[String] => DeliverCallback =
promise => (tag, delivery) => promise.success(new String(delivery.getBody))
val consumerChannelA = fixture.createChannel()
consumerChannelA.basicConsume(queueNameA, true, consumer(promiseA), (tag: String) => {})
val consumerChannelB = fixture.createChannel()
consumerChannelB.basicConsume(queueNameB, true, consumer(promiseB), (tag: String) => {})
val publisherChannel = fixture.createChannel()
publisherChannel.basicPublish("broadcast", "", null, "ok".getBytes)
Future.sequence(List(promiseA, promiseB).map(_.future)).map { results =>
results shouldBe List("ok", "ok")
}
}
}
org.scalatest.tools.Runner.main(Array("-oDF", "-s", classOf[LearningRabbitmqThroughTests].getName))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment