-
-
Save sharma-rohit/3090717c52397a8de517be2655a076e1 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.UUID | |
import akka.actor.{Actor, Props} | |
import scala.annotation.tailrec | |
class SqsBatchPollerManager(sqsService: SqsService) extends Actor { | |
import SqsBatchPollerManager._ | |
import SqsBatchPoller.{BatchProcessed, PollSqsBatch} | |
private val maxUnprocessedMessages = 1000 | |
private val parallelism = 4 | |
private val batchSize = 10 | |
//actor states | |
private var pollingStarted: Boolean = false | |
private var unprocessedMessagesCount: Int = 0 | |
override def receive: Receive = { | |
case StartPollingSQS => | |
if (!pollingStarted) { | |
pollingStarted = true | |
self ! PollSqs | |
} | |
case PollSqs => | |
if(pollingStarted && shouldPoll(unprocessedMessagesCount)) { | |
howMuchToPoll(parallelism, batchSize, maxUnprocessedMessages, unprocessedMessagesCount).foreach {numOfMsgsToPoll => | |
unprocessedMessagesCount += numOfMsgsToPoll | |
pollSqsBatch(numOfMsgsToPoll) | |
} | |
} | |
case BatchProcessed(numOfMsgsToPoll) => | |
unprocessedMessagesCount -= numOfMsgsToPoll | |
if(pollingStarted) { self ! PollSqs } | |
} | |
private def pollSqsBatch(numOfMsgsToPoll: Int): Unit = { | |
val sqsBatchProcessor = context.actorOf( | |
SqsBatchPoller.props(sqsService), | |
SqsBatchPoller.name(UUID.randomUUID().toString) | |
) | |
sqsBatchProcessor ! PollSqsBatch(numOfMsgsToPoll) | |
} | |
private def shouldPoll(unprocessedMessagesCount: Int): Boolean = { unprocessedMessagesCount < maxUnprocessedMessages } | |
} | |
object SqsBatchPollerManager { | |
// Actor Messages | |
case object StartPollingSQS | |
case object PollSqs | |
def props (sqsService: SqsService)(implicit mat: Materializer): Props = Props(new SqsBatchPollerManager(sqsService)) | |
def howMuchToPoll (maxParallelism: Int, | |
batchSize: Int, | |
maxUnprocessedMessages: Int, | |
unprocessedMessages: Int | |
): List[Int] = { | |
@tailrec | |
def loop (n: Int, result: List[Int] = Nil): List[Int] = n match { | |
case i if result.size >= maxParallelism || i <= 0 => result | |
case i if i < batchSize => result :+ i | |
case i => loop(i - batchSize, result :+ batchSize) | |
} | |
val numOfMsgsToPoll = maxUnprocessedMessages - unprocessedMessages | |
loop(numOfMsgsToPoll) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment