-
-
Save sharma-rohit/3c023b8a29fac67b8f9f80b708104c62 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.actor.{Actor, ActorLogging, Props} | |
import akka.cluster.sharding.ClusterSharding | |
import com.amazonaws.services.sqs.model.MessageAttributeValue | |
import scala.collection.mutable | |
import scala.util.{Failure, Success, Try} | |
class SqsBatchPoller (sqsService: SqsService) | |
extends Actor with ActorLogging { | |
import SqsBatchPoller._ | |
val shardRegion = ClusterSharding(context.system).shardRegion("shard-region") | |
// Actor States | |
private var numOfMsgsToProcess: Int = 0 | |
private val unprocessedMessages: mutable.Set[String] = mutable.Set.empty | |
override def receive: Receive = { | |
case PollSqsBatch(numOfMsgs) => | |
numOfMsgsToProcess = numOfMsgs | |
sqsService.poll(numOfMsgs).andThen { case msgs => self ! PolledSqsBatch(msgs) } | |
case PolledSqsBatch(Success(msgs)) => | |
msgs.foreach { msg => | |
unprocessedMessages.add(msg.id) | |
shardRegion ! msg | |
} | |
checkAndConclude(unprocessedMessages.isEmpty, numOfMsgsToProcess) // if all the polled msgs are invalid | |
case PolledSqsBatch(Failure(f)) => | |
log.error(f, "Failed to poll SQS") | |
checkAndConclude(unprocessedMessages.isEmpty, numOfMsgsToProcess) | |
case SQSMessageProcessed(id) => | |
unprocessedMessages.remove(id) | |
checkAndConclude(unprocessedMessages.isEmpty, numOfMsgsToProcess) | |
} | |
private def checkAndConclude (allMsgsProcessed: Boolean, numOfMsgsToProcess: Int): Unit = if (allMsgsProcessed) { | |
context.parent ! BatchProcessed(numOfMsgsToProcess) | |
context.stop(self) | |
} | |
} | |
object SqsBatchPoller { | |
def name (suffix: String) = s"sqs-batch-processor-$suffix" | |
case class SqsMessage(id: String, body: String, handle: String, attributes: Map[String, MessageAttributeValue]) | |
// Actor Messages | |
case class SQSMessageProcessed (id: String) | |
case class PollSqsBatch (numOfMsgsToPoll: Int) | |
case class BatchProcessed (numOfMsgsToPoll: Int) | |
case class PolledSqsBatch (msgs: Try[Vector[SqsMessage]]) | |
def props (sqsService: SqsService): Props = Props(new SqsBatchPoller(sqsService)) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment