Skip to content

Instantly share code, notes, and snippets.

What would you like to do?
import{Actor, ActorLogging, Props}
import akka.cluster.sharding.ClusterSharding
import scala.collection.mutable
import scala.util.{Failure, Success, Try}
class SqsBatchPoller (sqsService: SqsService)
extends Actor with ActorLogging {
import SqsBatchPoller._
val shardRegion = ClusterSharding(context.system).shardRegion("shard-region")
// Actor States
private var numOfMsgsToProcess: Int = 0
private val unprocessedMessages: mutable.Set[String] = mutable.Set.empty
override def receive: Receive = {
case PollSqsBatch(numOfMsgs) =>
numOfMsgsToProcess = numOfMsgs
sqsService.poll(numOfMsgs).andThen { case msgs => self ! PolledSqsBatch(msgs) }
case PolledSqsBatch(Success(msgs)) =>
msgs.foreach { msg =>
shardRegion ! msg
checkAndConclude(unprocessedMessages.isEmpty, numOfMsgsToProcess) // if all the polled msgs are invalid
case PolledSqsBatch(Failure(f)) =>
log.error(f, "Failed to poll SQS")
checkAndConclude(unprocessedMessages.isEmpty, numOfMsgsToProcess)
case SQSMessageProcessed(id) =>
checkAndConclude(unprocessedMessages.isEmpty, numOfMsgsToProcess)
private def checkAndConclude (allMsgsProcessed: Boolean, numOfMsgsToProcess: Int): Unit = if (allMsgsProcessed) {
context.parent ! BatchProcessed(numOfMsgsToProcess)
object SqsBatchPoller {
def name (suffix: String) = s"sqs-batch-processor-$suffix"
case class SqsMessage(id: String, body: String, handle: String, attributes: Map[String, MessageAttributeValue])
// Actor Messages
case class SQSMessageProcessed (id: String)
case class PollSqsBatch (numOfMsgsToPoll: Int)
case class BatchProcessed (numOfMsgsToPoll: Int)
case class PolledSqsBatch (msgs: Try[Vector[SqsMessage]])
def props (sqsService: SqsService): Props = Props(new SqsBatchPoller(sqsService))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.