Skip to content

Instantly share code, notes, and snippets.

@evanphx
Created March 20, 2014 21:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save evanphx/9674625 to your computer and use it in GitHub Desktop.
Save evanphx/9674625 to your computer and use it in GitHub Desktop.
def read(n)
start:
while true
offset = @buffer.current_offset
# If we can't update the offset, someone else did it at the same time,
# go again.
next if cas(&@buffer.current_offset, offset, offset+n)
# If the buffer contains all the data we need, return it!
if offset + n < @buffer.size
return buffer.buf(offset,offset+n)
end
# Otherwise grab the lock and make a new buffer
@resize_lock.synchronize do
goto start if offset + n <= @buffer.size
start = @buffer.buf(offset,buffer.size)
rest = n - (buffer.size - offset)
new_buffer = Buffer.new(self)
new_buffer.fill_at_least(rest)
ret = start + new_buffer.buf(0,rest)
new_buffer.current_offset = rest
@buffer = new_buffer
return ret
end
end
end
@headius
Copy link

headius commented Mar 21, 2014

This code isn't thread-safe, unfortunately. If thread A reaches line 12 while thread B does a reallocation of the buffer, the offsets in thread A may not make any sense. In the JDK libraries, this sort of setup is called an "AtomicStampedReference", and the JDK implements it by CASing a tuple of the reference object and the offset all at once. You can't CAS two values (someday we might have DCAS, but we don't have it now).

I looked at making Array thread-safe using a similar technique, but you end up paying the cost of allocating a new immutable tuple every time. That's ok for some things (read-mostly), but not ok for every heavily-mutated data structures like arrays and IO buffers.

I'm interested in finding a better solution too, FWIW.

@chuckremes
Copy link

I agree with @headius now that I've started to try and implement this. @evanphx what do you think of adding a new line between 3 & 4 that essentially does "buffer = @buffer" so that each thread is working on their own local reference? That way if thread B comes along and reallocates a new @buffer at line 27, thread A will be operating on its own local reference and will continue to "see" the original buffer. Hopefully @yorickpeterse sees this too.

@chuckremes
Copy link

@headius thanks for that pointer to AtomicStampedReference. That looks potentially useful in this case regardless of its limitations.

@headius
Copy link

headius commented Mar 22, 2014

buffer = @buffer doesn't fix the problem either, unfortunately, since you might get mismatched buffer and offset :-( You have to be able to get/set both buffer and offset atomically for this approach to work, which is pretty much only possible (at a hardware level) if you're swapping a tuple.

@chuckremes
Copy link

@headius I am a little slow on the uptake here. How would the buffer and offset be mismatched in this race?

If we store a local reference to @buffer (buffer = @buffer) then we are working with the local copy's #current_offset too.

def read(n)
start:
  while true
    # assignment is atomic, so store a reference to @buffer locally. In the case 
    # where another thread replaces it with a new buffer, we still have our
    # reference to the original.
    buffer = @buffer
    offset = buffer.current_offset

    # If we can't update the offset, someone else did it at the same time,
    # go again.
    next if cas(&buffer.current_offset, offset, offset+n)

    # If the buffer contains all the data we need, return it!
    if offset + n < buffer.size
      return buffer.buf(offset,offset+n)
    end

    # Otherwise grab the lock and make a new buffer
    @resize_lock.synchronize do
      goto start if offset + n <= buffer.size

      start = buffer.buf(offset,buffer.size)
      rest = n - (buffer.size - offset)

      new_buffer = Buffer.new(self)
      new_buffer.fill_at_least(rest)

      ret = start + new_buffer.buf(0,rest)
      new_buffer.current_offset = rest
      @buffer = new_buffer

      return ret
    end
  end
end

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment