Skip to content

Instantly share code, notes, and snippets.

@nraychaudhuri
Created August 6, 2014 18:08
Show Gist options
  • Save nraychaudhuri/dd5dda862842f3c943bb to your computer and use it in GitHub Desktop.
Save nraychaudhuri/dd5dda862842f3c943bb to your computer and use it in GitHub Desktop.
back pressure
I try to put things into the larger perspective here, so prepare, long mail.
== Defining backpressure
First of all, we need a working definition of "backpressure". I put it in quotes since I will attack things from a very general perspective which might not exactly match what others think as backpressure.
Sometimes it is easier to characterize the correct behavior of a system/protocol by defining what we consider incorrect behavior rather than trying to limit ourselves when approaching from correctness properties. In the case of backpressure there are two undesireable kinds of behavior:
- If the buffers in the system might growth without bounds then the backpressure protocol is not safe
- If the protocol can become stuck, although the sender has messages to deliver and the receiver is also available to process them, then the backpressure protocol is not safe
If we take these negative statements and turn them around, we arrive to one notion of backpressure which is simply: A correct backpressure protocol is both bounded and live.
Backpressure comes into the picture of distributed/asychronous systems quite quickly since blocking is not available (and not desirable) to use it as a side-channel to stop the sender. Therefore all realistic backpressure protocols must be bounded and live, and only use explicit channels of communication instead of blocking.
Let's dissect a bit the boundedness property. Given a sender system A, and the corresponding receiver B, we can identify the following parts:
- Input buffer of B with size Buf(B), containing elements Pending(B)
- (Possibly) an buffer of A with size Buf(A) containing elements Pending(A)
- Messages that are in one of the buffers or are in flight.
If we denote the number of sent data messages that have left InputBuf(A) but has not yet been enqueued to InputBuf(B) as InFlight then we want to achieve the following:
Pending(B) + InFlight =< Buf(B)
Since A does not know how large is the buffer of B and how much elements are in flight or actually arrived, it needs an explicit communication channel from B that signals the available space by sending permit messages. It is quite easy to see that we want is
Permits + InFlight = Buf(B)
Since permits are also sent asynchronously, from the viewpoint of A there is an accumulated number of permits PendingPermits that it has so far received and has not used, and there might be permits that has not arrived yet:
PendingPermits + PermitsInFlight + InFlight = Buf(B)
With slightly reorganized:
PendingPermits + PermitsInFlight = Buf(B) + InFlight
We see two opposingly facing "buffers", one is the buffer of B, and the other "buffer" is the PendingPermits at A. Together these, with the in-flight elements create a closed loop where various tokens (data, permits) circulate. Of course PendingPermits =< (Buf(B) - Pending(B)) otherwise we would overrun B.
It might look curious, but the buffer of A does not come into the picture at all, it is just a place where elements are taken from, but does not directly take part in the control loop between A and B. It will serve a purpose though when I discuss composing backpressure later.
It is very important to realize, that the closed loop above relies on two properties
- no tokens are created (when B consumes a message, it produces a permit, if A consumes a permit, it produces a message), otherwise we would violate boundedness
- no tokens are lost, otherwise we might run out of circulating permits/data and deadlock
The above two properties become very important once the channels used are droppy.
== Push vs. Pull
For simplicity, let us start with a simple scenario where there is always one data message or one permit circulating, i.e. we look at a Stop-And-Wait protocol. The push approach would look like this:
SEND - "Here is a message"
ACK - "I processed the message"
SEND - "Here is a message"
ACK - "I processed the message"
SEND - "Here is a message"
ACK - "I processed the message"
...
The pull approach would look like this:
REQUEST - "I want a message"
SEND - "Here is a message"
REQUEST - "I want a message"
SEND - "Here is a message"
REQUEST - "I want a message"
SEND - "Here is a message"
...
Looking at the two approaches, the repeating pattern is always the same:
...
PERMIT
DATA
PERMIT
DATA
...
The only difference is who sends the first message. So the question arises, if the two approaches are basically the same, why would we choose one over the other? The answer is that it depends on the management requiremenets of the participants. Take the master-workers example, where the master is only allowed to send jobs to one of the workers if it currently does not process any job. The push approach would require the master to be aware of:
- the set of all the currently on-line workers
- the state of all those workers (processing, idle)
On the other hand, if workers just (periodically) ask for jobs when they are idle the above information becomes implicitly available:
- if a worker is on-line, it will eventually ask for job -- no need to keep track of it until then
- if a worker is idle, it will eventually ask for a job. If noone is idle, then noone will ask.
The above is a fan-out scenario. In a fan-in scenario we would see similar benefits of the push pattern compared to the pull one.
= Surviving drops
Since we established that losing tokens (permit or data) eventually deadlocks the system, we must have a timeout+retry mechanism that recovers them in case of lossy channels. Unfortunately resending messages might introduce tokens (permit or data) if the original token was not actually lost and we would violate boundedness -- therefore it essential to track IDs of messages and deduplicate them.
(Akka Streams has currently no recovery from dropped tokens, this is the reason why it is not safe to expose Akka Publishers/Subscribers remotely without adding recovery mechanisms)
== Scheduling permits
In the above scenarios we considered the strict PERMIT -> DATA -> PERMIT -> ... progression but this is not the most effective one. Considering a push scenario, and a buffer size of 4 at the receiver the scheduling of permits might be the following:
SEND
SEND
SEND
SEND
PERMIT(4)
SEND
SEND
SEND
SEND
PERMIT(4)
...
This is much better in terms of througput, since the sender needs to wait the propagation delay of PERMIT after 4 sends instead of one -- amortizing its cost in turn. Another possible option is to completely overlap permits and sends, avoiding waits most of the time. Considering a pull scenario and a buffer size of 4 at the receiver, with a PERMIT batch size of 2, we get the following:
PERMIT(4)
SEND
SEND
PERMIT(2) // Overlaps with the next 2 sends, since there are still 2 permits available at the sender
SEND
SEND
PERMIT(2) // Overlaps with the next 2 sends, since there are still 2 permits available at the sender
SEND
SEND
PERMIT(2) // Overlaps with the next 2 sends, since there are still 2 permits available at the sender
...
As you see, no tokens are lost, every 2 sends produce exactly 2 permits (after the messages has been dequeued and processed of course).
(The above batching permit behavior is exactly the schedule Akka Streams implement.)
== Composing Backpressure
Once there are multiple producer-consumer pairs chained together, the problem arises: how these chains can compose and work together. Once a processing element becomes a consumer and producer at the same time, it participates in two disctinct control loops. In the example where we tried to define backpressure there was a buffer in A which we did not use at all in the correctness characterization. In case of a chain scenario the buffer of A will participating in the other control loop.
One interesting property that results from control loops being separate is that the buffer sizes may be different towards the upstream and downstream. In particular, the following chain can work:
[A] -- (Batched, overlapping permits) --> [B (bufferSize = 16)] -- (Stop-And-Wait) -- > [TCP actor (bufferSize = 1)]
Since the control loop between A-B and B-TCP is distinct, it is possible to use different buffer sizes and schedules in those two channels.
(This is exactly how Akka Streams work. Backpressure is only local between a Publisher-Subscriber pair, global backpressure is a result of the aggregate local behavior)
Things become more tricky though when the transformations are not one-to-one. In the above example, B might be parsing chunks of text data arriving from A along line endings. This means that the following scenarios can all happen:
- B needs more than one messages from A to be able to send something to TCP: the messages "Hell", "o Worl", "d!\n" forces B to consume 3 messages from A before to be able to send the parsed message "Hello World!" to TCP
- B produces more than one message from a message received from A. An example: the message from A "Hello World!\n Hello Akka\n" will result in the messages "Hello World!" and "Hello Akka!".
- B produces more than one message from multiple messages received from A, but reframed. Example: "Hello W", "orld!\n Hello Akka!\n Hello Typesafe!\n" results in "Hello World!", "Hello Akka!", "Hello Typesafe!"
In all these cases B must drive the upstream and downstream control loops separately obeying the boundedness of tokens (permits/data) on both sides.
(These one-to-many, many-to-one, many-to-many scenarios are all handled by Akka Streams. In particular, in the internal implementation there is a DSL which allows to express the "pumping" behavior between two chained control loops, safely transferring elements between them)
== When should you NOT apply backpressure
While backpressure is an incredibly important design consideration, there are dangers of overusing it. Any reasonably sized system will have explicit or implicit communication cycles. Unfortunately in the presence of cylces, while it is trivial to ensure boundedness, liveness is very hard to achieve. If backpressure is used without care in cycles the system will eventually deadlock.
A very simples degenerate example:
--> [A one-to-many] --> [B one-to-one] --> (feeding back to A)
Obviously the above cycle will either need an unbounded buffer or will deadlock.
Deadlocks can happen in the presence of blocking clients as well. Consider the scenario
Client --> Server -- (echoing back) --> Client
where Server has a buffer size of 1024. Now the following blocking client code will deadlock:
blockingWrite(2048 bytes)
blocingRead()
The blocking write will pass 1024 bytes to server, which cannot echo back anything yet, since it has no permit to do so (we have not started reading in the client), while the client cannot write, since it depleted all permits, waiting forever being blocked on the write call never progressing to the read part.
In my personal opinion, it is better to err on the side of unboundedness (or large bounds) than liveness if the system is large, i.e. it is fine to have unbounded elements at certain points -- just make sure to observe their behavior of them under load and take them into consideration.
Of course there is an other option, namely dropping.
== Dropping
Dropping messages is a very effective way of getting rid of unnecessary load and sidestepping some of the liveness issues. One naive assumption would be that dropping does not need the back-channel we used above (the permits). Unfortunately this might not be the case. If the underlying communication channel to the receiver is not droppy then the sender needs to decide on the dropping itself -- for that, it needs to know when dropping is needed.
Knowing when to drop is only one aspect of dropping. Knowing what to drop is just as important. It is quite common practice to drop from the tail of a queue. Unfortunately this is dangerous if the senders are expected to retry sending messages in case of drops. The following properties are true for FIFO queues:
- The message at the tail is the youngest
- the message at the head is the oldest
Now if we drop always from the tail, and the queue is too large, it might be that the message at the head of the tail is so old, that it has been resent several times, and those resends are also in the queue already.
Therefore, in the face of resends it is always desireable to:
- Drop the oldest instead of the youngest, since they will be redelivered soon
- Size the queue so that the waiting time is less than the resend timeout
== Unsafe, optimistic approaches
In the above I considered the backpressure techniques that are correct in the boundedness/liveness sense. There are simpler approaches that more or less ensure boundedness, but do not guarantee it.
= Squelch signals
/Squelch - to stop (something) from continuing by doing or saying something/
This is a very simple backpressure protocol. There are no permits, just a SQUELCH messages that notifies the sender to back-off for a while. This message is sent by the receiver if the elements in his buffer surpass a high watermark value. This technique relies on the following property:
- If D is the delay to send the SQUELCH message to the sender and processing it, and R is the maximum rate of the sender, then D*R places must be available in the buffer.
The problem usually is that D can be very large, and vary considerably (especially if the SQUELCH message is lost and needs to be resent).
That said, if there is no other option to implement, having a SQUELCH signal is better than nothing -- just don't rely on it too much.
= Source throttling
If the processing rate of the receiver is known, then it is possible to throttle back the sender without the use of a backchannel: by periodically producing PERMITS locally. This is usually called a Token Bucket and is essentially a simulation of the client sending PERMIT signals at a stable rate. Of course if the model assumption (the rate of client) turns out to be false, this falls apart.
The benefit of this approach though that it does not need protocol modification for the receiver, so if it is a 3rd party system that has no backpressure protocol, this might be the only possible solution.
== An example system with backpressure design
To put these things together, imagine the following system
- a master node, where external clients submit jobs
- worker nodes that process jobs coming trough the master node
- an external monitoring tool, accepting UDP packets, but having no backpressure
We can use then the following backpressure design:
- clients use TCP towards the master, so backpressure on the wire level is granted
- master maintains a fixed size job queue, and since processing jobs might be slower than the rate of jobs
arriving from TCP, it drops jobs from the front of the queue if necessary (it assumes they will be resubmitted)
- workers pull jobs from master, which simplifies the bookkeeping at the master but achieves backpressure. Only Stop-And-Wait schedule is needed, because processing the job takes way longer than the delay of the permit signal on the wire.
- workers publish metrics to the monitoring tool via UDP. The processing rate of the monitoring node is known, and we know the maximum number of workers, therefore we implement throttling at each of the workers so that the aggregate load is always below what the monitoring node can handle. (assuming this is our only option here) In the case where we have more events than we have rate permit for, instead of dropping messages, we start to roll them up into summaries (averages) until we can deliver them. (In Akka Streams this is what the "conflate" element is for)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment