Skip to content

Instantly share code, notes, and snippets.

@deefdragon
Last active July 15, 2023 07:39
Show Gist options
  • Save deefdragon/122fecdc37a883b9c5498e0ba4c4cca3 to your computer and use it in GitHub Desktop.
Save deefdragon/122fecdc37a883b9c5498e0ba4c4cca3 to your computer and use it in GitHub Desktop.
RabbitMQ Time Delay Queue

Time Delay Queue

One of the things that I was sure must exist, but had, until this point, never found anywhere was a distributed time delay. That is, a way to add data to a bucket/queue/dataset/what have you, that would then allow for work to be done on that data after a pre-determined delay. A delay that was independent to each item (the ability for different delays on each item automatically allows for the same delay on each item)

When working in a single service, using a sleep command is often good enough, but when you have to wait longer than about a minute, you should be using a separate (ideally distributed) tool.

The reason for this, in my opinion, is that you should not trust your service to not be shut down in the time between the event starts and the event ends.

This gist explains, roughly, how to create a time delay queue in RabbitMQ (And thus AMQP in general) as well as includes code required to do so in terraform.

This work is built from/inspired by a talk by @adamralph called Where we're going... we don't need batch jobs

Existing Extension

There is an existing extension to rabbitMQ that theoretically allows this already. The recommendation with that extension however, is to not delay more than a day or two, and it is not a supported extension, meaning it has some external difficulties installing and supporting etc.

The goal of this gist is to allow you to

Why terraform

Given the large number of bindings required for a time delay queue (about log(max_time_delay,2)^2 / 2), it is imperative that you construct this queue set using Infrastructure As Code in some form. Doing so by hand is only really doable for systems where the in-program delay would be acceptable.

The terraform should be a drop-in module, but this document should also explain how to construct the bindings in a way that you can duplicate the terraform in your IAC of choice.

Dead letter queues

The core of this design is a series of dead-letter-exchanges and time-to-live queues. There is one queue for each power of 2 greater than the maximum delay time. Combining these queues in the correct order (the binary representation of the desired delay) allows for delaying any message by any number of seconds. (A minor change would allow for delay for any number of milliseconds, however, the accuracy of this system is +/- 1 second, and so that precise a delay is not the best idea.)

Steps

  1. Messages are published to an ingest exchange.
  2. The ingest exchange sends the message to one of N time-delay queues
  3. The message sits in the time-delay queue until the message's time in the queue surpasses that queues TTL.
  4. The queue Dead-Letters the message to the matching time-delay-done exchange.
  5. This exchange, in the same manner as step 2, sends the message to the next required time-delay queue.
  6. Repeat steps 3-5 until the message has no more queues that it needs to be delayed in
  7. Send message to "done" queue to be consumed.

Connecting the queues

At a glance, the web of bindings is difficult to understand how to implement.

The First key is to examine each exchange independently. For every exchange attached to a queue delaying for N seconds, it only requires bindings to queues that would delay for Less than N seconds.

The second difficulty here is sending the message to exactly one queue. To solve this problem, one must understand that the next queue we wish the message to go to next is the queue with the largest delay. (as messages can only go to queues with shorter delays) Or in other words, a message should only go to a given queue if it would have gone to no queue with a larger time delay.

The method of creating these bindings is to use topic exchanges, and to encode the desired delay in the message's key as dot seperated binary digits. (IE 0.1.1.0.1). Then by using an assortment of single digit wildcards and full match wildcards, messages can be appropriately matched.

Binding Keys

Keys are always in the form of a #, 0 or more 0, up to one 1, and 0 or more *, all characters seperated by ..

Next step is another delay queue

To determine the keys that a message goes to, you take the number of now-important digits (1 fewer than the digit of the queue currently being used) and use that as the number of 1,0 and *. You then start with no 0 and add one for each time you skip a queue.

This produces the following keys from the 8.done exchange

  • #.1.*.* to send to the 4 queue
  • #.0.1.* to send to the 2 queue
  • #.0.0.1 to send to the 1 queue

Ingest exchange

The ingest exchange is mostly no different than the delay queue keys. The only thing is the length of the keys. To determine the length, you just need to make sure that the keys are able to send to the longest delay queue.

Should the queues be set up to have a max delay of 15 seconds, the following bindings would exist

  • #.1.*.*.* to send to the 8 queue
  • #.0.1.*.* to send to the 4 queue
  • #.0.0.1.* to send to the 2 queue
  • #.0.0.0.1 to send to the 1 queue
  • #.0.0.0.0 to send directly to the done queue (this is optional and not included in the terraform, but should be done to make sure no messages are dropped silently)

(tho effectively no different, these bindings have been separated out from the next-step keys in the terraform for clarity.)

Next step is the done queue

For the done keys, the message must have no more delays to go through. Thus it is all 0s for the remaining digits, producing the following keys.

  • #.0.0.0 when in the 8.done exchange
  • #.0.0 when in the 4.done exchange
  • #.0 when in the 2.done exchange
  • # when in the 1.done exchange

See the following diagram for how the keys are all created.

  sequenceDiagram
      Ingest -->> Q8: #35;.1.*.*.*
      Ingest -->> Q4: #35;.0.1.*.*
      Ingest -->> Q2: #35;.0.0.1.*
      Ingest -->> Q1: #35;.0.0.0.1
  
      Q8 -->> Q4: #35;.1.*.*
      Q8 -->> Q2: #35;.0.1.*
      Q8 -->> Q1: #35;.0.0.1
      Q8 -->> Done: #35;.0.0.0
  
      Q4 -->> Q2: #35;.1.*
      Q4 -->> Q1: #35;.0.1
      Q4 -->> Done: #35;.0.0
  
  
      Q2 -->> Q1: #35;.1
      Q2 -->> Done: #35;.0
  
      Q1 -->> Done: #35;

Loading
## TERRAFORM
terraform {
required_providers {
rabbitmq = {
source = "cyrilgdn/rabbitmq"
version = "~> 1.6.0"
}
}
}
## INPUTS
variable "name" {
description = "the name to use for queues"
type = string
}
variable "max_delay" {
description = "the maximum delay in seconds that is desired. Default ~ 1 week. This is used to calculate the number of queues that are required"
type = number
default = 60 * 60 * 24 * 7
}
variable "vhost" {
description = "the vhost to create this time delay queue for"
type = string
}
# LOCAL GENERATION
locals {
name = var.name
desiredMaxSeconds = var.max_delay
vhost = var.vhost
}
locals {
#Calculat the required number of time delay queues
idxMax = ceil(log(local.desiredMaxSeconds + 1, 2))
# create the array of indexes for the queues
indexes = range(0, local.idxMax)
# calculate the seconds to delay for each queue (powers of 2).
# Requires a hack (the lower() call) to get a set of string as terraform interprets a quoted number as a number directly
powers = toset([for x in local.indexes : lower("${pow(2, x)}")])
# delays = toset([for x in local.powers : "${x * 1000}"])
# create the keys for matching against the ingest exchange to get the message into its first queue.
ingestKeyPairings = { for dest in local.indexes : "${local.idxMax}-${dest}" => {
destP = lower("${pow(2, dest)}"),
key = join(".", flatten([
["#"],
[for x in range(0, dest - local.idxMax + 1) : "0"],
["1"],
[for x in range(0, dest) : "*"],
]))
}
}
# Create the main body queue keys that take messages from one delay queue to another
keyPairings = merge([for src in local.indexes : {
for dest in local.indexes : "${src}-${dest}" => {
srcP = lower("${pow(2, src)}"),
destP = lower("${pow(2, dest)}"),
# src = src,
# dest = dest,
key = join(".", flatten([
["#"],
[for x in range(0, src - 1 - dest) : "0"],
["1"],
[for x in range(0, dest) : "*"],
]))
#, some number of 0, 1, some number of *
} if dest < src
}]...)
# create the keys to take messages from any given queue to the destination queue.
endKeyPairings = { for src in local.indexes : "0-${src}" => {
srcP = lower("${pow(2, src)}"),
destP = "0",
# src = src,
# dest = dest,
key = join(".", flatten([
["#"],
[for x in range(0, src) : "0"],
]))
}
}
}
# RESOURCES
# where the data first goes (ingest exchange)
resource "rabbitmq_exchange" "ingest" {
name = "${local.name}.ingest"
vhost = local.vhost
settings {
auto_delete = false
durable = true
type = "topic"
}
}
# the queue that a message is sent to to wawit for its DLQ.
# has a timeout to cause the message to immediatly go to the matching "done" exchange when its time is up.
resource "rabbitmq_queue" "delay" {
name = "${local.name}.delay.${each.value}"
for_each = local.powers
vhost = local.vhost
settings {
durable = true
auto_delete = false
arguments_json = jsonencode({
"x-message-ttl" = parseint(each.value, 10) * 1000
"x-dead-letter-exchange" = "${local.name}.delay.${each.value}.done"
})
}
}
# the exchanges that messages are DLQed to that then routes them to the next stage.
# One exchange exists for each delay time.
resource "rabbitmq_exchange" "delay" {
name = "${local.name}.delay.${each.value}.done"
for_each = local.powers
vhost = local.vhost
settings {
auto_delete = false
durable = true
type = "topic"
}
}
# the queue that data ends up in when done.
resource "rabbitmq_queue" "output" {
name = "${local.name}.done"
vhost = local.vhost
settings {
durable = true
auto_delete = false
# arguments_json = jsonencode({
# "x-queue-type" = "quorum" # Makes the queue HA.
# "x-dead-letter-strategy" = "at-least-once"
# "x-delivery-limit" = 5 # Used to allow for retries in the done-queue.
# "x-dead-letter-exchange" = "${local.name}.done.dead" # Destination exchange should the retries fail.
# })
}
}
## BINDINGS
# bind ingest exchange to the first queue that the data is delayed in.
resource "rabbitmq_binding" "ingest_bindingss" {
depends_on = [rabbitmq_exchange.delay, rabbitmq_exchange.ingest, rabbitmq_queue.delay, rabbitmq_queue.output]
for_each = local.ingestKeyPairings
source = "${local.name}.ingest"
routing_key = each.value.key
vhost = local.vhost
destination = "${local.name}.delay.${each.value.destP}"
destination_type = "queue"
}
# bind the data from a completed-exchange to the next queue
resource "rabbitmq_binding" "bindings" {
depends_on = [rabbitmq_exchange.delay, rabbitmq_exchange.ingest, rabbitmq_queue.delay, rabbitmq_queue.output]
for_each = local.keyPairings
source = "${local.name}.delay.${each.value.srcP}.done"
routing_key = each.value.key
vhost = local.vhost
destination = "${local.name}.delay.${each.value.destP}"
destination_type = "queue"
}
# bind the data from a completed-exchange to the output queue if it has no more delay to do.
resource "rabbitmq_binding" "complete_bindingss" {
depends_on = [rabbitmq_exchange.delay, rabbitmq_exchange.ingest, rabbitmq_queue.delay, rabbitmq_queue.output]
for_each = local.endKeyPairings
source = "${local.name}.delay.${each.value.srcP}.done"
routing_key = each.value.key
vhost = local.vhost
destination = "${local.name}.done"
destination_type = "queue"
}
# DONE QUEUE DLQ
# Optional dead letter exchange for any messages that fail in the done queue.
#resource "rabbitmq_exchange" "output_dead" {
# name = "${local.name}.done.dead"
# for_each = local.powers
# vhost = local.vhost
# settings {
# auto_delete = false
# durable = true
# type = "fanout"
# }
#}
# the dead letter queue for the done queue.
#resource "rabbitmq_queue" "output_dead" {
# name = "${local.name}.done.dead"
# vhost = local.vhost
# settings {
# durable = true
# auto_delete = false
# }
#}
# bind dlx to dlq
#resource "rabbitmq_binding" "dead_binding" {
# depends_on = [rabbitmq_exchange.delay, rabbitmq_exchange.ingest, rabbitmq_queue.delay, rabbitmq_queue.output_dead, rabbitmq_queue.output_dead]
#
# source = "${local.name}.done.dead"
# routing_key = "#"
# vhost = local.vhost
# destination = "${local.name}.done.dead"
# destination_type = "queue"
#}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment