Skip to content

Instantly share code, notes, and snippets.

@sharma-rohit
Created February 12, 2019 23:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sharma-rohit/3c023b8a29fac67b8f9f80b708104c62 to your computer and use it in GitHub Desktop.
Save sharma-rohit/3c023b8a29fac67b8f9f80b708104c62 to your computer and use it in GitHub Desktop.
import akka.actor.{Actor, ActorLogging, Props}
import akka.cluster.sharding.ClusterSharding
import com.amazonaws.services.sqs.model.MessageAttributeValue
import scala.collection.mutable
import scala.util.{Failure, Success, Try}
class SqsBatchPoller (sqsService: SqsService)
extends Actor with ActorLogging {
import SqsBatchPoller._
val shardRegion = ClusterSharding(context.system).shardRegion("shard-region")
// Actor States
private var numOfMsgsToProcess: Int = 0
private val unprocessedMessages: mutable.Set[String] = mutable.Set.empty
override def receive: Receive = {
case PollSqsBatch(numOfMsgs) =>
numOfMsgsToProcess = numOfMsgs
sqsService.poll(numOfMsgs).andThen { case msgs => self ! PolledSqsBatch(msgs) }
case PolledSqsBatch(Success(msgs)) =>
msgs.foreach { msg =>
unprocessedMessages.add(msg.id)
shardRegion ! msg
}
checkAndConclude(unprocessedMessages.isEmpty, numOfMsgsToProcess) // if all the polled msgs are invalid
case PolledSqsBatch(Failure(f)) =>
log.error(f, "Failed to poll SQS")
checkAndConclude(unprocessedMessages.isEmpty, numOfMsgsToProcess)
case SQSMessageProcessed(id) =>
unprocessedMessages.remove(id)
checkAndConclude(unprocessedMessages.isEmpty, numOfMsgsToProcess)
}
private def checkAndConclude (allMsgsProcessed: Boolean, numOfMsgsToProcess: Int): Unit = if (allMsgsProcessed) {
context.parent ! BatchProcessed(numOfMsgsToProcess)
context.stop(self)
}
}
object SqsBatchPoller {
def name (suffix: String) = s"sqs-batch-processor-$suffix"
case class SqsMessage(id: String, body: String, handle: String, attributes: Map[String, MessageAttributeValue])
// Actor Messages
case class SQSMessageProcessed (id: String)
case class PollSqsBatch (numOfMsgsToPoll: Int)
case class BatchProcessed (numOfMsgsToPoll: Int)
case class PolledSqsBatch (msgs: Try[Vector[SqsMessage]])
def props (sqsService: SqsService): Props = Props(new SqsBatchPoller(sqsService))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment