Skip to content

Instantly share code, notes, and snippets.

@ndemengel
Last active April 26, 2020 13:15
Show Gist options
  • Save ndemengel/fc973c03bb909b14f95c136d101352dc to your computer and use it in GitHub Desktop.
Save ndemengel/fc973c03bb909b14f95c136d101352dc to your computer and use it in GitHub Desktop.
Command queue: core logic
class CommandQueue(...) {
// ...
override fun schedule(command: CommandSpecification) {
// add task to queue, log details, emit metrics
schedule(ScheduledTask(
command,
queueName,
clock,
// this is the important part for deduplication to work
scheduledExecutionDate = executionPolicy.computeNextExecutionDate(clock, tries = 0)
), command.deduplicate)
}
// ...
override fun processCommands(): Boolean {
val circuitBreaker = circuitBreaker()
if (circuitBreaker.isOpen()) {
return false
}
val task = taskRepository.tryLockingTaskWithEarliestScheduleOlderThan(queueName, LocalDateTime.now(clock))
?: return false
val command = commandRegistry.get(task.commandName)
// each case: decides what to do with task, log details, emit metrics
val executionResult = when {
command == null -> commandNotFound(task)
violatesRateLimit(task.weight) -> rateLimited(task)
!running -> aborted(task)
else -> executeCommand(command, task)
}
registerCall(circuitBreaker, executionResult)
// remove task, or move it to quarantine, or update number of tries
handleExecutionResult(executionResult)
return executionResult.commandExecuted
}
// ...
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment