-
-
Save evanphx/9674625 to your computer and use it in GitHub Desktop.
def read(n) | |
start: | |
while true | |
offset = @buffer.current_offset | |
# If we can't update the offset, someone else did it at the same time, | |
# go again. | |
next if cas(&@buffer.current_offset, offset, offset+n) | |
# If the buffer contains all the data we need, return it! | |
if offset + n < @buffer.size | |
return buffer.buf(offset,offset+n) | |
end | |
# Otherwise grab the lock and make a new buffer | |
@resize_lock.synchronize do | |
goto start if offset + n <= @buffer.size | |
start = @buffer.buf(offset,buffer.size) | |
rest = n - (buffer.size - offset) | |
new_buffer = Buffer.new(self) | |
new_buffer.fill_at_least(rest) | |
ret = start + new_buffer.buf(0,rest) | |
new_buffer.current_offset = rest | |
@buffer = new_buffer | |
return ret | |
end | |
end | |
end |
I agree with @headius now that I've started to try and implement this. @evanphx what do you think of adding a new line between 3 & 4 that essentially does "buffer = @buffer" so that each thread is working on their own local reference? That way if thread B comes along and reallocates a new @buffer at line 27, thread A will be operating on its own local reference and will continue to "see" the original buffer. Hopefully @yorickpeterse sees this too.
@headius thanks for that pointer to AtomicStampedReference. That looks potentially useful in this case regardless of its limitations.
buffer = @buffer doesn't fix the problem either, unfortunately, since you might get mismatched buffer and offset :-( You have to be able to get/set both buffer and offset atomically for this approach to work, which is pretty much only possible (at a hardware level) if you're swapping a tuple.
@headius I am a little slow on the uptake here. How would the buffer and offset be mismatched in this race?
If we store a local reference to @buffer (buffer = @buffer) then we are working with the local copy's #current_offset too.
def read(n)
start:
while true
# assignment is atomic, so store a reference to @buffer locally. In the case
# where another thread replaces it with a new buffer, we still have our
# reference to the original.
buffer = @buffer
offset = buffer.current_offset
# If we can't update the offset, someone else did it at the same time,
# go again.
next if cas(&buffer.current_offset, offset, offset+n)
# If the buffer contains all the data we need, return it!
if offset + n < buffer.size
return buffer.buf(offset,offset+n)
end
# Otherwise grab the lock and make a new buffer
@resize_lock.synchronize do
goto start if offset + n <= buffer.size
start = buffer.buf(offset,buffer.size)
rest = n - (buffer.size - offset)
new_buffer = Buffer.new(self)
new_buffer.fill_at_least(rest)
ret = start + new_buffer.buf(0,rest)
new_buffer.current_offset = rest
@buffer = new_buffer
return ret
end
end
end
This code isn't thread-safe, unfortunately. If thread A reaches line 12 while thread B does a reallocation of the buffer, the offsets in thread A may not make any sense. In the JDK libraries, this sort of setup is called an "AtomicStampedReference", and the JDK implements it by CASing a tuple of the reference object and the offset all at once. You can't CAS two values (someday we might have DCAS, but we don't have it now).
I looked at making Array thread-safe using a similar technique, but you end up paying the cost of allocating a new immutable tuple every time. That's ok for some things (read-mostly), but not ok for every heavily-mutated data structures like arrays and IO buffers.
I'm interested in finding a better solution too, FWIW.