Skip to content

Instantly share code, notes, and snippets.

@markglh
Last active May 5, 2017 13:26
Show Gist options
  • Save markglh/a958dbd49f22e6b2e67f25f76a3e966f to your computer and use it in GitHub Desktop.
Save markglh/a958dbd49f22e6b2e67f25f76a3e966f to your computer and use it in GitHub Desktop.
private def processing(responseCollector: ResponseCollector, manager: ActorRef, retryAttempt: Int = 0): Receive = LoggingReceive {
def switchToReadyState(latestProcessedSeq: Option[CompoundSequenceNumber]) = {
context.become(readyToProcess(latestProcessedSeq))
unstashAll()
}
receiveCheckpoint(responseCollector.latestConfirmedEventSeq, Some(manager)).orElse {
case EventProcessed(sequenceNo, successful) =>
responseCollector.receivedResponse(sequenceNo)
if (!successful) logger.warn(s"Worker for shard $latestShardId: Skipping message SequenceNumber '$sequenceNo' due to client failure")
if (responseCollector.isDone) {
cancelBatchProcessingTimeoutTimer()
logger.trace(s"Worker for shard $latestShardId: successfully completed batch")
manager ! ProcessingComplete()
switchToReadyState(responseCollector.latestConfirmedEventSeq)
} else {
context.become(processing(responseCollector, manager))
}
case ProcessingTimeout if retryAttempt < workerConf.failedMessageRetries =>
val unconfirmedResponses = responseCollector.unconfirmedEvents
logger.debug(s"Worker for shard $latestShardId: Timed out processing batch, retrying ${unconfirmedResponses.size} messages")
restartBatchProcessingTimeoutTimer()
context.become(processing(responseCollector, manager, retryAttempt + 1))
unconfirmedResponses.foreach {
eventProcessor ! ProcessEvent(_) //try again!!
}
case ProcessingTimeout =>
val unconfirmedResponses = responseCollector.unconfirmedEvents
logger.warn(s"Worker for shard $latestShardId: Timed out processing batch, failed to confirm processing for: $unconfirmedResponses")
cancelBatchProcessingTimeoutTimer() //to be sure
if (unconfirmedResponses.size <= allowedFailures(responseCollector.totalEventsCount)) {
val latestSeq = responseCollector.latestEventSeq // Just pretend all events were confirmed
logger.warn(s"Ignoring unconfirmed messages, setting next checkpoint at $latestSeq")
manager ! ProcessingComplete()
switchToReadyState(latestSeq)
} else {
logger.warn(s"Worker for shard $latestShardId: Shutting down shard worker on this node")
eventProcessor ! ConsumerWorkerFailure(unconfirmedResponses, latestShardId.getOrElse("-"))
manager ! ProcessingComplete(successful = false)
//Move back to ready, although we don't expect to process another batch - that's up to the manager
switchToReadyState(responseCollector.latestConfirmedEventSeq)
}
case m =>
logger.info(s"Worker for shard $latestShardId: Unexpected message received by ConsumerWorker whilst in processing state, stashing: $m")
stash()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment