Skip to content

Instantly share code, notes, and snippets.

@sharma-rohit
Created February 12, 2019 23:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sharma-rohit/3090717c52397a8de517be2655a076e1 to your computer and use it in GitHub Desktop.
Save sharma-rohit/3090717c52397a8de517be2655a076e1 to your computer and use it in GitHub Desktop.
import java.util.UUID
import akka.actor.{Actor, Props}
import scala.annotation.tailrec
class SqsBatchPollerManager(sqsService: SqsService) extends Actor {
import SqsBatchPollerManager._
import SqsBatchPoller.{BatchProcessed, PollSqsBatch}
private val maxUnprocessedMessages = 1000
private val parallelism = 4
private val batchSize = 10
//actor states
private var pollingStarted: Boolean = false
private var unprocessedMessagesCount: Int = 0
override def receive: Receive = {
case StartPollingSQS =>
if (!pollingStarted) {
pollingStarted = true
self ! PollSqs
}
case PollSqs =>
if(pollingStarted && shouldPoll(unprocessedMessagesCount)) {
howMuchToPoll(parallelism, batchSize, maxUnprocessedMessages, unprocessedMessagesCount).foreach {numOfMsgsToPoll =>
unprocessedMessagesCount += numOfMsgsToPoll
pollSqsBatch(numOfMsgsToPoll)
}
}
case BatchProcessed(numOfMsgsToPoll) =>
unprocessedMessagesCount -= numOfMsgsToPoll
if(pollingStarted) { self ! PollSqs }
}
private def pollSqsBatch(numOfMsgsToPoll: Int): Unit = {
val sqsBatchProcessor = context.actorOf(
SqsBatchPoller.props(sqsService),
SqsBatchPoller.name(UUID.randomUUID().toString)
)
sqsBatchProcessor ! PollSqsBatch(numOfMsgsToPoll)
}
private def shouldPoll(unprocessedMessagesCount: Int): Boolean = { unprocessedMessagesCount < maxUnprocessedMessages }
}
object SqsBatchPollerManager {
// Actor Messages
case object StartPollingSQS
case object PollSqs
def props (sqsService: SqsService)(implicit mat: Materializer): Props = Props(new SqsBatchPollerManager(sqsService))
def howMuchToPoll (maxParallelism: Int,
batchSize: Int,
maxUnprocessedMessages: Int,
unprocessedMessages: Int
): List[Int] = {
@tailrec
def loop (n: Int, result: List[Int] = Nil): List[Int] = n match {
case i if result.size >= maxParallelism || i <= 0 => result
case i if i < batchSize => result :+ i
case i => loop(i - batchSize, result :+ batchSize)
}
val numOfMsgsToPoll = maxUnprocessedMessages - unprocessedMessages
loop(numOfMsgsToPoll)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment