Skip to content

Instantly share code, notes, and snippets.

@sstone
Created September 21, 2015 18:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sstone/f4b9448c696911b265a5 to your computer and use it in GitHub Desktop.
Save sstone/f4b9448c696911b265a5 to your computer and use it in GitHub Desktop.
montoring rabbitmq jobs
package com.github.sstone.amqp
import java.util.UUID
import akka.actor.{ActorLogging, Actor, ActorSystem, Props}
import com.github.sstone.amqp.Amqp._
import com.github.sstone.amqp.RpcServer.{IProcessor, ProcessResult}
import com.rabbitmq.client.{AMQP, BasicProperties, ConnectionFactory}
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
object Test1 extends App {
implicit val system = ActorSystem("mySystem")
val conn = system.actorOf(ConnectionOwner.props(new ConnectionFactory(), reconnectionDelay = 5 seconds), "connection")
// create 2 servers
val processor = new IProcessor {
override def onFailure(delivery: Delivery, e: Throwable): ProcessResult = ???
override def process(delivery: Delivery): Future[ProcessResult] = {
Thread.sleep(100)
Future.successful(ProcessResult(Some(delivery.body))) // here we could send anything back
}
}
val queueParams = QueueParameters("request_queue", passive = false, durable = false, exclusive = false, autodelete = true)
val server1 = ConnectionOwner.createChildActor(
conn,
RpcServer.props(queueParams, StandardExchanges.amqDirect, "my_key", processor, ChannelParameters(qos = 1)))
val server2 = ConnectionOwner.createChildActor(
conn,
RpcServer.props(queueParams, StandardExchanges.amqDirect, "my_key", processor, ChannelParameters(qos = 1)))
val producer = ConnectionOwner.createChildActor(conn, ChannelOwner.props())
// create a client that will handle job batches
case class Job(expected: Int, received: Vector[Array[Byte]])
class MyClient extends Actor with ActorLogging {
val jobs = new collection.mutable.HashMap[String, Job]
def receive = {
case ('work, data: Array[Byte]) =>
val items = data.grouped(20).toList
val correlationId = UUID.randomUUID().toString
val properties = new AMQP.BasicProperties.Builder().correlationId(correlationId).replyTo("response_queue").build()
items.map(item => producer ! Publish("", "request_queue", item, Some(properties)))
jobs += correlationId -> Job(items.size, Vector.empty[Array[Byte]])
case Delivery(tag, envelope, properties, body) if !jobs.contains(properties.getCorrelationId) =>
log.warning(s"received response for unknown job ${properties.getCorrelationId}")
case Delivery(tag, envelope, properties, body) =>
val job = jobs(properties.getCorrelationId)
val received = job.received :+ body
if (received.size == job.expected) {
log.info(s"job ${properties.getCorrelationId} has been completed")
jobs -= properties.getCorrelationId
} else {
log.info(s"job ${properties.getCorrelationId}: ${received.size} sub-jobs have been completed out of ${job.expected}")
jobs.update(properties.getCorrelationId, job.copy(received = received))
}
}
}
val client = system.actorOf(Props[MyClient])
val responseQueueParams = QueueParameters("response_queue", passive = false, durable = false, exclusive = false, autodelete = true)
val consumer = ConnectionOwner.createChildActor(conn, Consumer.props(client, channelParams = None, autoack = true))
// wait till everyone is actually connected to the broker
Amqp.waitForConnection(system, consumer).await()
consumer ! Record(AddQueue(responseQueueParams))
Thread.sleep(1000)
val data = new Array[Byte](100)
client ! ('work, data)
client ! ('work, data)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment