Skip to content

Instantly share code, notes, and snippets.

@matthiasr
Last active May 9, 2022 14:45
Show Gist options
  • Star 6 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save matthiasr/a1e2fdd4a08b7869bdda28b113763249 to your computer and use it in GitHub Desktop.
Save matthiasr/a1e2fdd4a08b7869bdda28b113763249 to your computer and use it in GitHub Desktop.
Notes on operating cluster queues

Someone asked in chat (paraphrased) "I am coming from synchronous online systems, and now am going to work on a queue-based system. What should I be aware of?" This is a (somewhat freely-associated) collection of thoughts, based on my experience at SoundCloud.

often you have queues feeding queues, and sometimes this includes fanout. a canonical example is putting one post into every follower's inbox/news feed/timeline. I think backpressure between the incoming and fanned out queues can be very helpful because you'd rather hold on to the one incoming message than the million outgoing ones.

you will find yourself needing to separate into "high" and "low" priority queues. use the low queue for work that is very bursty and has lower latency requirements. as a practical example, we do this when processing new tracks.

on one hand, we have manual uploads – manageable and steady rate because one user can only drag&drop so much. but these users are waiting for their track to go live.

on the other hand, there are batch ingestion processes that dump hundreds of thousands of tracks all at once – but nobody will know if they take an hour to process.

in certain places, we even configure pod priorities so the low queue workers can be evicted in favor of the high ones, until the cluster autoscaling catches up

for every queue or Kafka consumer, consider when processing old events is no longer worth it. Examples:

  • cache invalidation: after your regular TTL, it doesn't make sense to process "hey thing changed" work
  • cache invalidation for things that change a lot, or where inconsistency is not obvious to the user: don't bother with old messages
  • who paid for a subscription: always process!

also have an idea under what circumstances you would even drop younger messages manually, and if necessary, how to restore order afterwards (can you get a list of things to resync elsewhere?)

handle poison pills. you probably want to retry work a few times, but even on "transient" errors stop trying if they keep happening for the same message. this is tricky because some dependency may just be down and it's not a poison pill after all. have a process for a human to make a decision – either by allowing them to pull a bad message (or many) off the queue, or (often better) shunting finally but unexpectedly failed messages to a dead letter queue that you manually go through later, or replay whole if the failure was external

generally it's useful to have tools or ways to

  • inspect messages in the queue without taking them out (what's there but don't stop it from getting processed)
  • truncate a queue (stop trying to do this)
  • dump a queue to files or other storage (we will deal with this later)
  • load dumps into the queue, with rate limiting (later is now)

Had to think about monitoring a bit, so here it goes. Different queue systems work very differently, so it's harder to make general statements. My experiences are with RabbitMQ (AMQP) and Kafka.

Fundamentally I see three different levels to worry about:

A. Is your application processing as it should be, and keeping up with work? B. Is the queue system working from the application's point of view? C. Is the queue system internally healthy?

For A, you want to know about throughput of every processor stage, any failures to process (retries, messages in dead letter queue), and the first derivative of the queue length (or consumer lag in Kafka). If your queue length is going down, it will catch up eventually, and you don't need to act. We often alert on "will this queue reach 0 within some time", you can do this with something like predict_linear(queue_length[10m], 3600) > 0 and queue_length > 100 and a relatively generous for clause. Don't go overboard with predictive alerting though, and link a dashboard that shows what is going on in the way that operators think, otherwise you're going to confuse the responders and they will start ignoring the alert.

Keep in mind that queues do not add capacity (beyond batching benefits), so in the steady state all queues must be more or less empty.

For B, you care about publishing latency (how long the sync part takes), publishing errors (how often it fails), and how long it takes to get through an empty queue (delay introduced by the queue system)

To measure this, have an end-to-end test that publishes the current time to its own queue, and consumes from it.

Aside– For the webhook use case, consider whether you can have a higher level end-to-end test as well: submit an event, measure when you get the callback.

C is the cause-based alerting that tells you why B is not as expected, and it can be predictive (do something now or there will be problems). This is very system dependent.

For example, Kafka does not care whether anyone is consuming messages, it stores them anyway and doesn't do a whole lot of bookkeeping. You mostly need to disk metrics – I/O saturation and whether your disks are running full. You also need to keep an eye on partitioning, since each partition needs to fit on one disk, and very large partitions make broker maintenance a huge pain.

RabbitMQ does suffer just from having a lot of messages in the system. There, you need to look at ready and unacknowledged messages separately.

Ready messages are relatively low impact, as long as you have enough disk and memory not-completely-ancient versions of RabbitMQ handle tens of millions OK. Load test this to see at what point you start running out of resources, or when latency increases.

Things get much worse when you have unacknowledged messages piling up. Think of ready messages as rows in a database – unacknowledged messages are open transactions. There's a ton more bookkeeping and coordination that happens around them and they'll start impacting broker performance several orders of magnitude sooner. We had misbehaving clients that immediately "accepted" an unbounded number of messages, and then failed to process any of them because they were GC'ing violently. This almost brought the broker to its knee – until we killed the client, all messages went back to ready, and Rabbit was happy again.

For both, figure out the overall numbers your setup can take with some load testing, and alert on getting near that globally. Optionally, you can break that down into a "backlog budget" for different queues. You may want to impose a lower limit on the pre-fanout queue than the post-fanout one, because you know it is less bursty and a small backlog there will result in many messages down the line.

As a backstop, also keep an eye on memory and disk utilization, in the steady state both will be low but you don't want to run out when messages are piling up. The death spiral of a RabbitMQ based system is when the broker is so loaded it starts slowing down, delivering messages to processors more slowly while the incoming rate is constant. This is when the dump-to-file tool, and knowing which queues you can drop altogether, comes in really handy.

understanding the structure and limits of the broker infrastructure is helpful when you get to design something on top – for example, with Kafka it's entirely possible to build a stream-batch transition by having a consumer group only run once a day until it's caught up, RabbitMQ will not be your friend if you do this at high volume

"What is the difference between Kafka and a message queue like SQS and RabbitMQ?"

Fundamentally they are different systems with very different characteristics. In the overlap, where you are using either to distribute some kind of work item over workers, the different approaches choose different trade offs.

AMQP systems distribute each item (message) separately, and keep track of each item's completion for you. This is great for load balancing (any available worker can pick up any available item from a queue), but it adds a per-item overhead on the broker. A few hundred thousand unacknowledged, or a few ten million ready messages will bring a RabbitMQ to its knees. This also limits throughput. It does not give you delivery ordering guarantees.

Kafka on the other hand writes each message into a never ending log, and it's on the consumer to track where in that log they got. A consumer can choose how often it stores that offset back into Kafka, so asymptotically you can process an arbitrary amount of messages without adding load to Kafka. The theoretical limit here is network throughput. Within a partition, you get guaranteed ordering, which can be handy for some designs. You scale by adding partitions, but if you care about ordering, you need to be smart about distributing messages to partitions. Since one partition is only consumed by one worker, you have to scale partitions before you can scale workers. You can get imbalances where one worker is idle, because its partition is less busy, while another is struggling to keep up. Kafka brokers are completely fine with billions of unprocessed messages, because to Kafka it does not make a darn difference whether a message is processed or not.

@bmhatfield
Copy link

The theoretical limit here is network throughput.

Should folks think of this as "IO throughput" instead? For example, I remember experiencing cases where a consumer was "outside the page cached window" behind and, this caused chaos if the IO profile of the machine couldn't sustain the write/read IO to/from disk.

@matthiasr
Copy link
Author

True, that's another limiting factor if a consumer is falling behind (which is a tricky to resolve latent failure mode). I haven't run into it in practice but that is probably down to a combination of quite beefy Kafka servers (with lots of IO capacity) and not too many slow consumers.

@matthiasr
Copy link
Author

The thing is that this far into a backlog, an AMQP broker would almost certainly fall over from all the management overhead that it has to do on top of the IO.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment