Created
October 13, 2015 21:18
-
-
Save dbousamra/1d3d8b5f8d0d5b454463 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
object Example { | |
implicit val es = Executors.newCachedThreadPool | |
val metrics = new MetricsCollectorImpl(MetricsCollectorConfiguration("example", "localhost", 8125)) | |
val ironConfig: IronMqConfig = ??? | |
val iron = IronMqMessaging(ironConfig, QueueName("example-queue")) | |
def main(args: Array[String]) { | |
val consumer = StreamMq(iron, metrics).consumer | |
consumer.consume { message => | |
println(message) | |
Ack() | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
case class StreamConsumer(messaging: Messaging, metrics: MetricsCollector, concurrency: Int = 8)(implicit val ec: ExecutorService) extends Consumer with LazyLogging { | |
type MessageContainer = messaging.MessageContainer | |
/* | |
* A stream of MessageContainer's defined in terms of the reserve method. | |
*/ | |
def reserveStream: Process[Task, MessageContainer] = { | |
Process.repeatEval { | |
Task { | |
messaging.reserve.fold(error => { | |
logger.error("Failed to reserve messages: " + error.reason.getMessage) | |
metrics.increment("messaging.reserved.error") | |
Nil | |
}, identity) | |
} | |
}.flatMap { reserved => | |
logger.debug(s"Reserved " + reserved.size + " messages") | |
metrics.incrementBy("messaging.reserved.success", reserved.size) | |
Process.emitAll(reserved) | |
} | |
} | |
/* | |
* A stream of acknowledgement side effects (streams of type Unit are called Sinks) | |
* This sink takes in a Seq of MessageContainers and acknowledges them in batch | |
*/ | |
def ackSink: Sink[Task, Seq[MessageContainer]] = liftIntoSink(messaging.ack, "ack") | |
/* | |
* A stream of acknowledgement side effects (streams of type Unit are called Sinks) | |
* This sink takes in a Seq of MessageContainers and acknowledges them in batch | |
*/ | |
def rejectSink: Sink[Task, Seq[MessageContainer]] = liftIntoSink(messaging.reject, "reject") | |
/* | |
* A stream of acknowledgement side effects (streams of type Unit are called Sinks) | |
* This sink takes in a Seq of MessageContainers and requeues them in batch | |
*/ | |
def requeueSink: Sink[Task, Seq[MessageContainer]] = liftIntoSink(messaging.requeue, "requeue") | |
def liftIntoSink(action: Seq[MessageContainer] => Unit, actionType: String): Sink[Task, Seq[MessageContainer]] = { | |
sink.lift { messages: Seq[MessageContainer] => | |
Task { | |
action(messages) | |
logger.debug(s"${actionType} ${messages.length} messages") | |
metrics.incrementBy(s"messaging.${actionType}.success", messages.length) | |
}.handle { | |
case e: Throwable => { | |
logger.error(s"Unable to ${actionType} messages" + e.getMessage) | |
metrics.incrementBy(s"messaging.${actionType}.error", messages.length) | |
} | |
} | |
} | |
} | |
/* | |
* Consume messages from the queue. Takes in a work function that must respond with an AckResponse. | |
* Operates in parallel and deals with backpressure | |
*/ | |
def consume(work: (Message) => AckResponse) = { | |
val reserveQueue = async.boundedQueue[MessageContainer](1000) | |
val ackQueue = async.boundedQueue[MessageContainer](1000) | |
val rejectQueue = async.boundedQueue[MessageContainer](1000) | |
val requeueQueue = async.boundedQueue[MessageContainer](1000) | |
val reserveEnqueueProcess = reserveStream.to(reserveQueue.enqueue) | |
val processed = reserveQueue.dequeue.evalMap { case (message, id) => | |
Task.delay { | |
Try(work(message)).toOption match { | |
case Some(response) => { | |
metrics.increment("messaging.worked.success") | |
(response, message, id) | |
} | |
case None => { | |
metrics.increment("messaging.worked.error") | |
(Reject(), message, id) | |
} | |
} | |
} | |
} | |
val ackResponseEnqueueProcess = processed.collect { | |
case (Ack(), message, id) => ackQueue.enqueueOne((message, id)) | |
case (Reject(), message, id) => rejectQueue.enqueueOne((message, id)) | |
case (Requeue(), message, id) => requeueQueue.enqueueOne((message, id)) | |
}.flatMap(i => Process.eval_(i)) | |
val ackProcess = ackQueue.dequeueBatch(100).to(ackSink) | |
val rejectProcess = rejectQueue.dequeueBatch(100).to(rejectSink) | |
val requeueProcess = requeueQueue.dequeueBatch(100).to(requeueSink) | |
val merged = (reserveEnqueueProcess merge ackResponseEnqueueProcess) merge (ackProcess merge rejectProcess merge requeueProcess) | |
val parallelMerged = scalaz.stream.merge.mergeN(concurrency)(Process.constant(merged)) | |
parallelMerged.run.run | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment