Skip to content

Instantly share code, notes, and snippets.

@dbousamra
Created October 13, 2015 21:18
Show Gist options
  • Save dbousamra/1d3d8b5f8d0d5b454463 to your computer and use it in GitHub Desktop.
Save dbousamra/1d3d8b5f8d0d5b454463 to your computer and use it in GitHub Desktop.
object Example {
implicit val es = Executors.newCachedThreadPool
val metrics = new MetricsCollectorImpl(MetricsCollectorConfiguration("example", "localhost", 8125))
val ironConfig: IronMqConfig = ???
val iron = IronMqMessaging(ironConfig, QueueName("example-queue"))
def main(args: Array[String]) {
val consumer = StreamMq(iron, metrics).consumer
consumer.consume { message =>
println(message)
Ack()
}
}
}
case class StreamConsumer(messaging: Messaging, metrics: MetricsCollector, concurrency: Int = 8)(implicit val ec: ExecutorService) extends Consumer with LazyLogging {
type MessageContainer = messaging.MessageContainer
/*
* A stream of MessageContainer's defined in terms of the reserve method.
*/
def reserveStream: Process[Task, MessageContainer] = {
Process.repeatEval {
Task {
messaging.reserve.fold(error => {
logger.error("Failed to reserve messages: " + error.reason.getMessage)
metrics.increment("messaging.reserved.error")
Nil
}, identity)
}
}.flatMap { reserved =>
logger.debug(s"Reserved " + reserved.size + " messages")
metrics.incrementBy("messaging.reserved.success", reserved.size)
Process.emitAll(reserved)
}
}
/*
* A stream of acknowledgement side effects (streams of type Unit are called Sinks)
* This sink takes in a Seq of MessageContainers and acknowledges them in batch
*/
def ackSink: Sink[Task, Seq[MessageContainer]] = liftIntoSink(messaging.ack, "ack")
/*
* A stream of acknowledgement side effects (streams of type Unit are called Sinks)
* This sink takes in a Seq of MessageContainers and acknowledges them in batch
*/
def rejectSink: Sink[Task, Seq[MessageContainer]] = liftIntoSink(messaging.reject, "reject")
/*
* A stream of acknowledgement side effects (streams of type Unit are called Sinks)
* This sink takes in a Seq of MessageContainers and requeues them in batch
*/
def requeueSink: Sink[Task, Seq[MessageContainer]] = liftIntoSink(messaging.requeue, "requeue")
def liftIntoSink(action: Seq[MessageContainer] => Unit, actionType: String): Sink[Task, Seq[MessageContainer]] = {
sink.lift { messages: Seq[MessageContainer] =>
Task {
action(messages)
logger.debug(s"${actionType} ${messages.length} messages")
metrics.incrementBy(s"messaging.${actionType}.success", messages.length)
}.handle {
case e: Throwable => {
logger.error(s"Unable to ${actionType} messages" + e.getMessage)
metrics.incrementBy(s"messaging.${actionType}.error", messages.length)
}
}
}
}
/*
* Consume messages from the queue. Takes in a work function that must respond with an AckResponse.
* Operates in parallel and deals with backpressure
*/
def consume(work: (Message) => AckResponse) = {
val reserveQueue = async.boundedQueue[MessageContainer](1000)
val ackQueue = async.boundedQueue[MessageContainer](1000)
val rejectQueue = async.boundedQueue[MessageContainer](1000)
val requeueQueue = async.boundedQueue[MessageContainer](1000)
val reserveEnqueueProcess = reserveStream.to(reserveQueue.enqueue)
val processed = reserveQueue.dequeue.evalMap { case (message, id) =>
Task.delay {
Try(work(message)).toOption match {
case Some(response) => {
metrics.increment("messaging.worked.success")
(response, message, id)
}
case None => {
metrics.increment("messaging.worked.error")
(Reject(), message, id)
}
}
}
}
val ackResponseEnqueueProcess = processed.collect {
case (Ack(), message, id) => ackQueue.enqueueOne((message, id))
case (Reject(), message, id) => rejectQueue.enqueueOne((message, id))
case (Requeue(), message, id) => requeueQueue.enqueueOne((message, id))
}.flatMap(i => Process.eval_(i))
val ackProcess = ackQueue.dequeueBatch(100).to(ackSink)
val rejectProcess = rejectQueue.dequeueBatch(100).to(rejectSink)
val requeueProcess = requeueQueue.dequeueBatch(100).to(requeueSink)
val merged = (reserveEnqueueProcess merge ackResponseEnqueueProcess) merge (ackProcess merge rejectProcess merge requeueProcess)
val parallelMerged = scalaz.stream.merge.mergeN(concurrency)(Process.constant(merged))
parallelMerged.run.run
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment