Last active
May 5, 2017 13:26
-
-
Save markglh/a958dbd49f22e6b2e67f25f76a3e966f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private def processing(responseCollector: ResponseCollector, manager: ActorRef, retryAttempt: Int = 0): Receive = LoggingReceive { | |
def switchToReadyState(latestProcessedSeq: Option[CompoundSequenceNumber]) = { | |
context.become(readyToProcess(latestProcessedSeq)) | |
unstashAll() | |
} | |
receiveCheckpoint(responseCollector.latestConfirmedEventSeq, Some(manager)).orElse { | |
case EventProcessed(sequenceNo, successful) => | |
responseCollector.receivedResponse(sequenceNo) | |
if (!successful) logger.warn(s"Worker for shard $latestShardId: Skipping message SequenceNumber '$sequenceNo' due to client failure") | |
if (responseCollector.isDone) { | |
cancelBatchProcessingTimeoutTimer() | |
logger.trace(s"Worker for shard $latestShardId: successfully completed batch") | |
manager ! ProcessingComplete() | |
switchToReadyState(responseCollector.latestConfirmedEventSeq) | |
} else { | |
context.become(processing(responseCollector, manager)) | |
} | |
case ProcessingTimeout if retryAttempt < workerConf.failedMessageRetries => | |
val unconfirmedResponses = responseCollector.unconfirmedEvents | |
logger.debug(s"Worker for shard $latestShardId: Timed out processing batch, retrying ${unconfirmedResponses.size} messages") | |
restartBatchProcessingTimeoutTimer() | |
context.become(processing(responseCollector, manager, retryAttempt + 1)) | |
unconfirmedResponses.foreach { | |
eventProcessor ! ProcessEvent(_) //try again!! | |
} | |
case ProcessingTimeout => | |
val unconfirmedResponses = responseCollector.unconfirmedEvents | |
logger.warn(s"Worker for shard $latestShardId: Timed out processing batch, failed to confirm processing for: $unconfirmedResponses") | |
cancelBatchProcessingTimeoutTimer() //to be sure | |
if (unconfirmedResponses.size <= allowedFailures(responseCollector.totalEventsCount)) { | |
val latestSeq = responseCollector.latestEventSeq // Just pretend all events were confirmed | |
logger.warn(s"Ignoring unconfirmed messages, setting next checkpoint at $latestSeq") | |
manager ! ProcessingComplete() | |
switchToReadyState(latestSeq) | |
} else { | |
logger.warn(s"Worker for shard $latestShardId: Shutting down shard worker on this node") | |
eventProcessor ! ConsumerWorkerFailure(unconfirmedResponses, latestShardId.getOrElse("-")) | |
manager ! ProcessingComplete(successful = false) | |
//Move back to ready, although we don't expect to process another batch - that's up to the manager | |
switchToReadyState(responseCollector.latestConfirmedEventSeq) | |
} | |
case m => | |
logger.info(s"Worker for shard $latestShardId: Unexpected message received by ConsumerWorker whilst in processing state, stashing: $m") | |
stash() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment