Skip to content

Instantly share code, notes, and snippets.

@djspiewak
Created April 20, 2015 22:42
Show Gist options
  • Star 15 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save djspiewak/671deab9d7ea027cdc42 to your computer and use it in GitHub Desktop.
Save djspiewak/671deab9d7ea027cdc42 to your computer and use it in GitHub Desktop.

Actor Queue Notes

The first thing to understand is that the head field inside of scalaz.concurrent.Actor is not the "head" of the message queue in any traditional sense of the word. A better description would be "last". The there are no pointers to the head of the queue, which one of the very clever things about this implementation.

Empty Actor

Consider the case where the actor has no outstanding messages. This new message will go into the following code:

  def !(a: A): Unit = {
    val n = new Node(a)
    val h = head.getAndSet(n)
    if (h ne null) h.lazySet(n)
    else schedule(n)
  }

As a preliminary, note that Node extends AtomicReference and contains a data cell, named a. head is itself of type AtomicReference[Node[A]], so clearly we have a linked list of AtomicReference(s).

When no messages are outstanding, the head will be set to null, and thus the conditional will fail and immediately call schedule(n). This call kicks off the active execution of the actor, likely on a different thread. This execution takes place in the act method:

  @annotation.tailrec
  private def act(n: Node[A], i: Int = 1024): Unit = {
    try handler(n.a) catch {
      case ex: Throwable => onError(ex)
    }
    val n2 = n.get
    if (n2 eq null) scheduleLastTry(n)
    else if (i == 0) schedule(n2)
    else act(n2, i - 1)
  }

The initial value of n will be exactly what we passed to schedule(n), which is to say, the new head value. We handle this message and then call n.get, retrieving the next node to handle. Ignore the i counter for now (this is a thread utilization trick); the important bit is the recursive call to act in the event that n2 is not null.

Non-Empty Actor

Meanwhile, let's assume that more messages have been coming into the actor. As these messages come in, remember that act is currently running and processing the queue! We run the getAndSet in ! on head, and the result is not null since we had messages outstanding. Thus, we need to add the message to the tail end of the queue. Notice though that we've already set the new Node to the value of head! This is why "head" is a poor descriptor. It really, really is more like "last".

Linking the new last node into the queue is as simple as updating the tail pointer on the old last node, which we do using the lazySet method on AtomicReference. This is a particular point of cleverness, because lazySet is a much faster version of set, but it relaxes write visibility guarantees! In particular, it does not impose a write-write barrier, and it does not (in general) force the CPU to go all the way to main memory. The downside is that this value may not be visible to other threads any time soon.

Intuitively, this seems wrong for two reasons. First, what if we have multiple concurrent threads attempting to write messages? Second, how does the message become visible on the act thread? We'll come back to the concurrent writes point, but the visibility question is easily addressed.

Note the call to scheduleLastTry(n) at the end of act when the next node is null. This case represents the state "we think we hit the end of the queue", and it is handled in the following way:

  private def scheduleLastTry(n: Node[A]): Unit = strategy(lastTry(n))

  private def lastTry(n: Node[A]): Unit = if (!head.compareAndSet(n, null)) act(next(n))

  @annotation.tailrec
  private def next(n: Node[A]): Node[A] = {
    val n2 = n.get
    if (n2 ne null) n2
    else next(n)
  }

All of this is obfuscation around two things: an atomic check on head to see if there are any extra values that we may have missed, and a call back to act passing the value of next(n). We only call back to act if head is not equal to the value of n. Now, remember that head really should be called "last", and n corresponds to the very last value of the queue that we found in our traversal. Thus, we're really just checking to see if our traversal got to the real end, and we're doing that check atomically. If the check passes, then we null out head and the actor atomically moves back to the "empty actor" state. If the check fails, we go into next(n).

Slow-Path Read Resolution

This is the clever bit: next is a busy-wait! Note that all it's doing is repeatedly reading the value of the same AtomicReference, over and over again, until it is non-null. There's no recursive traversal here; it's just the same Node. The reason for this is we know that there are more values in the queue, and the reason we know this is because our atomic check on head (which should be "last") failed. If the check had passed, then we would know that n was in fact the very last node in the queue and we would be done. The check failed though, which means that there is a subsequent node which just isn't visible yet on the current thread (remember: lazySet and not set, which would guarantee immediate visibility).

Note that the node which is in head might not be the next node we need to process. All we know about head is that it contains the very last node in the queue, but there may be several nodes which have come in and not yet become visible on the current thread. It is for this reason that we cannot simply read the value of head; we have to spin-wait on the value of our current node being non-null.

Fast-Path Write Performance

The reason for lazySet is it is several factors faster than set (usually around 4x-8x). We could use set, which would mean that next(n) would always return without iteration. This would of course make next a lot faster, but the penalty would be making ! dramatically slower! Not only that, but it would make ! slower in the common case in exchange for making next faster in the uncommon case. Thus, we fast-path optimize by relaxing our write barrier semantics, and the result is a screaming-fast ! function.

This optimization is only made possible because of the atomic check on head. Because we have an atomic cell with full memory barrier semantics that allows us to efficiently check for the existence of further elements, we are able to enter the spin wait with atomic certainty.

Concurrent Writes

The only remaining case we have no addressed is that of concurrent writers. There are no compareAndSet calls to update the state of the queue inside of !, so intuitively, how do we not lose messages under contention? Recall the definition of !:

  def !(a: A): Unit = {
    val n = new Node(a)
    val h = head.getAndSet(n)
    if (h ne null) h.lazySet(n)
    else schedule(n)
  }

Imagine multiple threads simultaneously stepping through this function. They simultaneously create a new blank Node, and then they grab the value of head and call getAndSet with the new node. This intuitively seems like trouble, since they aren't checking for previous values, updating pointers atomically or anything like that. However, it turns out that this works just fine!

One of the threads in this function will get the "old" value of head (perhaps null, or perhaps some other node). The other thread will get the value set by the first thread. In other words, no data is lost, it's just that the node ends up held by another thread under contention!

If the old value of head was null, only one of the threads will get that old null value due to the atomicity of getAndSet. That thread will make the call to schedule(n). Meanwhile, the other thread got the new node set by the first thread and has that value in h. Remember that head is really "last". We got the old value of "last" (in h), and we have atomically stored the new value of "last" in head. This atomic storage signals to act that there are more values to read and triggers the spin-wait in next. The only thing we have left to do is update the pointers on the old "last" to forward next along to our new "last", and we do this using lazySet.

Recall that getAndSet is fully atomic regardless of contention. There may be any number of threads in this function simultaneously, and they will all get different values of h. Thus, the fact that every thread is unconditionally calling lazySet is not a problem and does not discard data, since only one thread will ever call lazySet on a given node! The other thing that this implies is that the queue is being concurrently and non-linearly constructed across multiple threads. As contention increases, the amount of non-linearity also goes up, but since no two threads get the same value for h (as guaranteed by getAndSet), ultimate linearization is strongly guaranteed.

As the lazySet write barriers are eventually flushed to main memory and exposed to other threads, the next function will find the next element of the queue and allow act to continue. There may be still more write barriers left to flush, but act will correctly identify this case in the lastTry function just as it did before. The actor will not stop until it manages to successfully write null into head, which will in turn atomically trigger any contending threads to re-schedule the actor in the ! function (as applicable).

Summary

This is very dense, very convoluted code, but it is also extremely clever and absurdly fast. The overhead here is very very low. While I have some quibbles with the code formatting (e.g. the naming of head), I do not believe that there are any bugs or suboptimalities in the algorithm as written.

@geggo98
Copy link

geggo98 commented Apr 22, 2015

Nice analysis! Any idea, why they check for null and don't use a case class or the Option data type to find out if the next Node exists?

@plokhotnyuk
Copy link

Nice article, Daniel!

Commits in this fork is my attempt to improve scalability and mitigate some problems of Scalaz actors (including your propose to rename head to last): https://github.com/plokhotnyuk/scalaz

Here is another example how it can be resolved by extension from AtomicReference:
https://github.com/plokhotnyuk/actors/blob/master/src/test/scala/com/github/gist/viktorklang/Actor.scala#L33

@djspiewak
Copy link
Author

@geggo98 Using a null check saves a TON of allocations and also eliminates a lot of bounds checking (or at the very least, casting) in places where we already know by external guarantee that the references are non-empty. I'm generally not in favor of using null as a performance cheat, but in this case due to the lean constraints of the implementation, I think it's a fine tradeoff.

@plokhotnyuk Nice. :-)

@plokhotnyuk
Copy link

@djspiewak here is article from @nitsanw that puts light to other aspects of this concurrent structure:
http://psy-lob-saw.blogspot.com/2015/04/porting-dvyukov-mpsc.html

Some other practices are waiting for comprehensive review and delightful explanations:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment