Skip to content

Instantly share code, notes, and snippets.

@headius
Forked from jstorimer/0-README.md
Last active December 15, 2015 19:09
Show Gist options
  • Save headius/5308854 to your computer and use it in GitHub Desktop.
Save headius/5308854 to your computer and use it in GitHub Desktop.

This is a proof-of-concept of a couple of concurrent data structures written in Ruby.

The implementations are heavily commented for those interested. There are benchmarks (with results) included below. The results are interesting, but, as always, take with a grain of salt.

Data structures

AtomicLinkedQueue is a lock-free queue, built on atomic CAS operations. It doesn't use any mutexes.

TwoLockLinkedQueue is a lock-based queue, but with separate locks for the head + tail. This means there can be lots of contention when pushing to the queue, but little when it comes to popping off the queue.

Both of these implementations are unbounded queues, with no blocking operations.

See the individual files below for more about their implementation.

Background

For those unfamiliar with the atomic compare-and-swap operation, here's a brief outline of how it operates.

Create an Atomic instance that holds a value. (This comes from the atomic rubygem).

item = Atomic.new('value')

The workhorse method is #compare_and_set(old, new). You pass it the value that you think it currently holds, and the value you want to set it to.

If it does hold the expected value, it's set to your new value. Otherwise, it returns false. In this implementation, when that happens, the operation is re-started, over and over, until it succeeds.

This compare-and-set operation is hardware-supported and works across Ruby implementations thanks to the 'atomic' gem. The hardware support provides the atomic guarantee. Without this guarantee, it would be possible to read a value, then have another thread update the value before you can, then your thread would overwrite that value. This stuff is complicated.

Due credit

I can't take credit for this implementation. This is an implementation of the pseudo-code from "Simple, Fast, and Practical Non-Blocking and Blocking Concurrent Queue Algorithms"[1], with some hints from Java's implementation of java.util.concurrent.ConcurrentLinkedQueue[2].

  1. http://www.cs.rochester.edu/research/synchronization/pseudocode/queues.html
  2. http://grepcode.com/file/repository.grepcode.com/java/root/jdk/openjdk/6-b14/java/util/concurrent/ConcurrentLinkedQueue.java

Benchmark results

These are the results of running the included benchmarks against Rubinius 2.0.0-rc1 and JRuby 1.7.3. Any results from MRI will be superficial due to the global lock, so I omitted it completely.

All of the benchmarks were run against Queue (a fully synchronized queue from Ruby's std lib), TwoLockLinkedQueue (implementation below using two different locks), and AtomicLinkedQueue (the lock-free implementation).

The benchmark_concurrent_reads+writes.rb benchmark allocates some threads to write to the queue, and others to read. So there's always contention for both pushing and popping.

The benchmark_concurrent_reads+writes.rb benchmark first focuses on filling up the queue to capacity, then emptying it completely. This focuses all of the contention on writing, then all of it on reading.

Rubinius results

Note that these results are out of date, because I couldn't get rbx to complete the benchmarks.

$ rbx benchmark_concurrent_reads+writes.rb
                                                        user     system      total        real
Queue - more writers than readers                   4.963446   4.536519   9.499965 (  6.903411)
Queue - more readers than writers                   5.774266   5.047607  10.821873 (  7.791965)
Queue - equal writers and readers                   5.398079   6.132016  11.530095 (  8.328646)
                                                       user     system      total        real
TwoLockLinkedQueue - more writers than readers      5.585377   7.506778  13.092155 (  6.561008)
TwoLockLinkedQueue - more readers than writers      5.312031   8.011661  13.323692 (  6.503712)
TwoLockLinkedQueue - equal writers and readers      6.560957  10.342055  16.903012 (  7.134648)
                                                       user     system      total        real
AtomicLinkedQueue - more writers than readers       6.380076   0.114960   6.495036 (  2.058872)
AtomicLinkedQueue - more readers than writers       2.369595   0.065632   2.435227 (  0.706512)
AtomicLinkedQueue - equal writers and readers       3.712438   0.123236   3.835674 (  1.225167)

$ rbx benchmark_separate_reads+writes.rb

Queue - fill, then empty - 10k                 0.191729   0.127249   0.318978 (  0.220572)
Queue - fill, then empty - 100k                1.466753   1.268979   2.735732 (  1.848341)
Queue - fill, then empty - 1mm                11.818961  12.738940  24.557901 ( 18.820045)
                                                   user     system      total        real
TwoLockLinkedQueue - fill, then empty - 10k    0.153434   0.159268   0.312702 (  0.210763)
TwoLockLinkedQueue - fill, then empty - 100k   1.335991   1.537175   2.873166 (  2.124443)
TwoLockLinkedQueue - fill, then empty - 1mm   11.675556  16.168307  27.843863 ( 21.965406)
                                                   user     system      total        real
AtomicLinkedQueue - fill, then empty - 10k     0.230940   0.007993   0.238933 (  0.037483)
AtomicLinkedQueue - fill, then empty - 100k    1.692784   0.021976   1.714760 (  0.364502)
AtomicLinkedQueue - fill, then empty - 1mm    11.247611   0.196520  11.444131 (  5.000861)

JRuby results

$ jruby benchmark_separate_reads+writes.rb 
						user     system      total        real
Queue - fill, then empty - 10k                  1.190000   0.040000   1.230000 (  0.225000)
Queue - fill, then empty - 100k                 1.310000   0.110000   1.420000 (  0.288000)
Queue - fill, then empty - 1mm                  2.110000   0.430000   2.540000 (  0.681000)
						user     system      total        real
TwoLockLinkedQueue - fill, then empty - 10k     2.380000   0.160000   2.540000 (  0.695000)
TwoLockLinkedQueue - fill, then empty - 100k    2.390000   0.520000   2.910000 (  0.930000)
TwoLockLinkedQueue - fill, then empty - 1mm    14.580000   3.780000  18.360000 (  6.983000)
						user     system      total        real
AtomicLinkedQueue - fill, then empty - 10k      5.760000   0.060000   5.820000 (  0.978000)
AtomicLinkedQueue - fill, then empty - 100k    37.360000   0.170000  37.530000 (  5.823000)
AtomicLinkedQueue - fill, then empty - 1mm     50.230000   0.410000  50.640000 (  8.396000)
						user     system      total        real
Queue - fill, then empty - 10k                  0.130000   0.010000   0.140000 (  0.037000)
Queue - fill, then empty - 100k                 0.290000   0.020000   0.310000 (  0.078000)
Queue - fill, then empty - 1mm                  1.930000   0.410000   2.340000 (  0.670000)
						user     system      total        real
TwoLockLinkedQueue - fill, then empty - 10k     0.530000   0.030000   0.560000 (  0.168000)
TwoLockLinkedQueue - fill, then empty - 100k    1.080000   0.430000   1.510000 (  0.741000)
TwoLockLinkedQueue - fill, then empty - 1mm    16.940000   3.910000  20.850000 (  7.803000)
						user     system      total        real
AtomicLinkedQueue - fill, then empty - 10k      0.280000   0.000000   0.280000 (  0.084000)
AtomicLinkedQueue - fill, then empty - 100k     1.940000   0.030000   1.970000 (  0.423000)
AtomicLinkedQueue - fill, then empty - 1mm     21.590000   0.150000  21.740000 (  4.222000)
						user     system      total        real
Queue - fill, then empty - 10k                  0.080000   0.010000   0.090000 (  0.026000)
Queue - fill, then empty - 100k                 0.200000   0.050000   0.250000 (  0.066000)
Queue - fill, then empty - 1mm                  1.730000   0.430000   2.160000 (  0.641000)
						user     system      total        real
TwoLockLinkedQueue - fill, then empty - 10k     0.180000   0.070000   0.250000 (  0.085000)
TwoLockLinkedQueue - fill, then empty - 100k    0.730000   0.400000   1.130000 (  0.597000)
TwoLockLinkedQueue - fill, then empty - 1mm    12.510000   3.920000  16.430000 (  6.930000)
						user     system      total        real
AtomicLinkedQueue - fill, then empty - 10k      0.280000   0.010000   0.290000 (  0.056000)
AtomicLinkedQueue - fill, then empty - 100k     1.560000   0.010000   1.570000 (  0.279000)
AtomicLinkedQueue - fill, then empty - 1mm     17.030000   0.130000  17.160000 (  2.807000)

$ jruby benchmark_concurrent_reads+writes.rb 
						     user     system      total        real
Queue - more writers than readers                    4.030000   0.230000   4.260000 (  0.856000)
Queue - more readers than writers                   13.350000   1.140000  14.490000 ( 11.366000)
Queue - equal writers and readers                    0.610000   0.190000   0.800000 (  0.279000)
						     user     system      total        real
TwoLockLinkedQueue - more writers than readers       8.290000   1.610000   9.900000 (  2.516000)
TwoLockLinkedQueue - more readers than writers       3.730000   1.650000   5.380000 (  2.403000)
TwoLockLinkedQueue - equal writers and readers       4.560000   1.780000   6.340000 (  2.450000)
						     user     system      total        real
AtomicLinkedQueue - more writers than readers      101.420000   0.490000 101.910000 ( 15.250000)
AtomicLinkedQueue - more readers than writers       12.600000   0.090000  12.690000 (  2.102000)
AtomicLinkedQueue - equal writers and readers       11.690000   0.080000  11.770000 (  1.929000)
						     user     system      total        real
Queue - more writers than readers                    5.590000   0.610000   6.200000 (  4.843000)
Queue - more readers than writers                   12.570000   1.080000  13.650000 ( 10.701000)
Queue - equal writers and readers                    1.860000   0.090000   1.950000 (  0.395000)
						     user     system      total        real
TwoLockLinkedQueue - more writers than readers       4.130000   1.220000   5.350000 (  1.802000)
TwoLockLinkedQueue - more readers than writers       2.950000   1.680000   4.630000 (  2.295000)
TwoLockLinkedQueue - equal writers and readers       4.640000   1.960000   6.600000 (  2.622000)
						     user     system      total        real
AtomicLinkedQueue - more writers than readers       15.320000   0.080000  15.400000 (  2.288000)
AtomicLinkedQueue - more readers than writers        6.670000   0.060000   6.730000 (  1.100000)
AtomicLinkedQueue - equal writers and readers        5.630000   0.050000   5.680000 (  0.940000)
						     user     system      total        real
Queue - more writers than readers                   12.490000   1.220000  13.710000 ( 10.951000)
Queue - more readers than writers                   12.570000   1.130000  13.700000 ( 11.102000)
Queue - equal writers and readers                    0.710000   0.250000   0.960000 (  0.452000)
						     user     system      total        real
TwoLockLinkedQueue - more writers than readers       3.570000   1.510000   5.080000 (  1.763000)
TwoLockLinkedQueue - more readers than writers       5.030000   1.910000   6.940000 (  2.909000)
TwoLockLinkedQueue - equal writers and readers       4.680000   2.280000   6.960000 (  2.894000)
						     user     system      total        real
AtomicLinkedQueue - more writers than readers        8.850000   0.060000   8.910000 (  1.454000)
AtomicLinkedQueue - more readers than writers        5.020000   0.040000   5.060000 (  0.935000)
AtomicLinkedQueue - equal writers and readers        5.560000   0.040000   5.600000 (  1.011000)
# This is a proof-of-concept of a concurrent, lock-free FIFO data
# structure written in Ruby. It leverages atomic updates, rather than
# lock-based synchronization.
require 'atomic'
class AtomicLinkedQueue
class Node
def initialize(item, successor)
@item = Atomic.new(item)
@successor = Atomic.new(successor)
end
def successor
@successor.value
end
def update_successor(old, new)
@successor.compare_and_set(old, new)
end
def item
@item.value
end
def update_item(thing)
# this feels dangerous...
@item.update { thing }
end
end
def initialize
dummy_node = Node.new(:dummy, nil)
@head = Atomic.new(dummy_node)
@tail = Atomic.new(dummy_node)
end
def push(item)
# allocate a new node with the item embedded
new_node = Node.new(item, nil)
# keep trying until the operation succeeds
loop do
current_tail_node = @tail.value
current_tail_successor = current_tail_node.successor
# if our stored tail is still the current tail
if current_tail_node == @tail.value
# if that tail was really the last node
if current_tail_successor.nil?
# if we can update the previous successor of tail to point to this new node
if current_tail_node.update_successor(current_tail_successor, new_node)
# then update tail to point to this node as well
@tail.compare_and_set(current_tail_node, new_node)
# and return
return true
# else, start the loop over
end
else
# in this case, the tail ref we had wasn't the real tail
# so we try to set its successor as the real tail, then start the loop again
@tail.compare_and_set(current_tail_node, current_tail_successor)
end
end
end
end
def pop
# retry until some value can be returned
loop do
# the value in @head is just a dummy node that always sits in that position,
# the real 'head' is in its successor
current_dummy_node = @head.value
current_tail_node = @tail.value
current_head_node = current_dummy_node.successor
# if our local head is still consistent with the head node, continue
# otherwise, start over
if current_dummy_node == @head.value
# if either the queue is empty, or falling behind
if current_dummy_node == current_tail_node
# if there's nothing after the 'dummy' head node
if current_head_node.nil?
# just return nil
return nil
else
# here the head element succeeding head is not nil, but the head and tail are equal
# so tail is falling behind, update it, then start over
@tail.compare_and_set(current_tail_node, current_head_node)
end
# the queue isn't empty
# if we can set the dummy head to the 'real' head, we're free to return the value in that real head, success
elsif @head.compare_and_set(current_dummy_node, current_head_node)
# grab the item from the popped node
item = current_head_node.item
if item != nil
current_head_node.update_item(nil)
end
# return it, success!
return item
# else
# try again
end
end
end
end
def size
successor = @head.value.successor
count = 0
loop do
break if successor.nil?
current_node = successor
successor = current_node.successor
count += 1
end
count
end
end
require 'benchmark'
require 'thread'
require_relative 'atomic_linked_queue'
require_relative 'two_lock_linked_queue'
thread_count = 50
iterations = 10_000
queue_klasses = [Queue, TwoLockLinkedQueue, AtomicLinkedQueue]
Thread.abort_on_exception = true
# this one tells all the threads when to start
$go = false
def setup(queue, writer_thread_count, reader_thread_count, iterations)
tg = ThreadGroup.new
# spawn writer threads
writer_thread_count.times do
t = Thread.new do
# wait until the bm starts to do the work. This should
# minimize variance.
nil until $go
iterations.times do
queue.push('item')
end
end
tg.add(t)
end
# spawn reader threads
if queue.class == Queue
# the Queue class gets special behaviour because its #pop
# method is blocking by default.
reader_thread_count.times do
t = Thread.new do
nil until $go
iterations.times do
begin
queue.pop(:nonblocking)
rescue
end
end
end
tg.add(t)
end
else
reader_thread_count.times do
t = Thread.new do
nil until $go
iterations.times do
queue.pop
end
end
tg.add(t)
end
end
tg
end
def exercise(tg)
$go = true
tg.list.each(&:join)
$go = false
end
3.times do
queue_klasses.each do |klass|
Benchmark.bm(50) do |bm|
queue = klass.new
tg = setup(queue, thread_count, (thread_count * 0.6).to_i, iterations)
bm.report("#{klass} - more writers than readers") { exercise(tg) }
queue = klass.new
tg = setup(queue, (thread_count * 0.6).to_i, thread_count, iterations)
bm.report("#{klass} - more readers than writers") { exercise(tg) }
queue = klass.new
tg = setup(queue, thread_count, thread_count, iterations)
bm.report("#{klass} - equal writers and readers") { exercise(tg) }
end
end
end
require 'benchmark'
require 'thread'
require_relative 'atomic_linked_queue'
require_relative 'two_lock_linked_queue'
thread_count = 50
queue_klasses = [Queue, TwoLockLinkedQueue, AtomicLinkedQueue]
Thread.abort_on_exception = true
# this one tells all the threads when to start
$go = false
def setup(queue, thread_count, queue_length)
tg = ThreadGroup.new
if queue.class == Queue
thread_count.times do
t = Thread.new do
# wait until the bm starts to do the work. This should
# minimize variance.
nil until $go
(queue_length / thread_count).to_i.times do
queue.push('item')
end
loop do
begin
result = queue.pop(:nonblock)
rescue => e
break
end
end
end
tg.add(t)
end
else
thread_count.times do
t = Thread.new do
nil until $go
(queue_length / thread_count).to_i.times do
queue.push('item')
end
result = true
until result.nil?
result = queue.pop
end
end
tg.add(t)
end
end
GC.start
tg
end
def exercise(tg)
$go = true
tg.list.each(&:join)
$go = false
end
3.times do
queue_klasses.each do |klass|
Benchmark.bm(45) do |bm|
queue = klass.new
tg = setup(queue, thread_count, 10_000)
bm.report("#{klass} - fill, then empty - 10k") { exercise(tg) }
raise 'hell' unless queue.size.zero?
queue = klass.new
tg = setup(queue, thread_count, 100_000)
bm.report("#{klass} - fill, then empty - 100k") { exercise(tg) }
raise 'hell' unless queue.size.zero?
queue = klass.new
tg = setup(queue, thread_count, 1_000_000)
bm.report("#{klass} - fill, then empty - 1mm") { exercise(tg) }
raise 'hell' unless queue.size.zero?
end
end
end
require 'thread'
class TwoLockLinkedQueue
# This Node doesn't need atomic updates, it assumes
# that you're modifying it while holding a lock
Node = Struct.new(:item, :successor)
def initialize
dummy_node = Node.new(:dummy, nil)
@head_node = dummy_node
@tail_node = dummy_node
@head_lock = Mutex.new
@tail_lock = Mutex.new
end
def push(item)
# allocate a new node with the item embedded
new_node = Node.new(item, nil)
@tail_lock.synchronize do
# update the successor of the current tail to point to the new node
@tail_node.successor = new_node
@tail_node = new_node
end
end
def pop
@head_lock.synchronize do
dummy_node = @head_node
head = @head_node.successor
if head.nil? # then queue was empty
return nil
else
# the current head becomes the new 'dummy' head
@head_node = head
# return its value
return head.item
end
end
end
def size
successor = @head_node.successor
count = 0
loop do
break if successor.nil?
current_node = successor
successor = current_node.successor
count += 1
end
count
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment