public
Created

An example race condition using fibers

  • Download Gist
fiber_race.rb
Ruby
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169
## SCROLL DOWN FOR THE RACE, THE FOLLOWING IS JUST UTILITY FROM COMMONLY USED CODE.
 
# Author:: Mohammad A. Ali (mailto:oldmoe@gmail.com)
# Copyright:: Copyright (c) 2008 eSpace, Inc.
# License:: Distributes under the same terms as Ruby
 
require 'fiber'
 
class Fiber
 
#Attribute Reference--Returns the value of a fiber-local variable, using
#either a symbol or a string name. If the specified variable does not exist,
#returns nil.
def [](key)
local_fiber_variables[key]
end
 
#Attribute Assignment--Sets or creates the value of a fiber-local variable,
#using either a symbol or a string. See also Fiber#[].
def []=(key,value)
local_fiber_variables[key] = value
end
 
private
 
def local_fiber_variables
@local_fiber_variables ||= {}
end
end
 
class FiberPool
 
# gives access to the currently free fibers
attr_reader :fibers
attr_reader :busy_fibers
 
# Code can register a proc with this FiberPool to be called
# every time a Fiber is finished. Good for releasing resources
# like ActiveRecord database connections.
attr_accessor :generic_callbacks
 
# Prepare a list of fibers that are able to run different blocks of code
# every time. Once a fiber is done with its block, it attempts to fetch
# another one from the queue
def initialize(count = 100)
@fibers,@busy_fibers,@queue,@generic_callbacks = [],{},[],[]
count.times do |i|
fiber = Fiber.new do |block|
loop do
block.call
# callbacks are called in a reverse order, much like c++ destructor
Fiber.current[:callbacks].pop.call while Fiber.current[:callbacks].length > 0
generic_callbacks.each do |cb|
cb.call
end
unless @queue.empty?
block = @queue.shift
else
@busy_fibers.delete(Fiber.current.object_id)
@fibers.unshift Fiber.current
block = Fiber.yield
end
end
end
fiber[:callbacks] = []
fiber[:em_keys] = []
@fibers << fiber
end
end
 
# If there is an available fiber use it, otherwise, leave it to linger
# in a queue
def spawn(&block)
if fiber = @fibers.shift
fiber[:callbacks] = []
@busy_fibers[fiber.object_id] = fiber
fiber.resume(block)
else
@queue << block
end
self # we are keen on hiding our queue
end
 
end
 
class Job
attr_reader :id
 
def initialize id
@id = id
@steps = [:start, :load_value, :business_logic, :store_value, :finish]
POOL.spawn do
run
end
end
 
def run
until done?
step = @steps.shift
p [@id, step]
send step
end
end
 
def start
do_io
end
 
def load_value
@value = DB.value
do_io
end
 
def business_logic
@value = @value + 1
do_io
end
 
def store_value
DB.value = @value
end
 
def finish
end
 
def done?
@steps.empty?
end
 
private
def do_io
IOLOOP.wait(Fiber.current) # would normally pass some resource here to wait against
Fiber.yield
end
end
 
class Io
def initialize(avg_sleep_time)
@avg_sleep_time = avg_sleep_time
@waiters = []
end
 
def wait(fiber)
@waiters << fiber
end
 
def run
sleep rand * @avg_sleep_time
# Lets say anywhere between 0 and 30% of the "IO" completes in this "tick"
dones = @waiters.sort_by{rand}[0..rand(@waiters.size * 0.3)]
@waiters -= dones
dones.each { |fiber| fiber.resume }
end
end
 
POOL = FiberPool.new
 
DB = Struct.new(:value).new
DB.value = 10
 
IOLOOP = Io.new(0.01)
 
jobs = Array.new(100) do |id|
Job.new(id)
end
 
IOLOOP.run until jobs.all? { |job| job.done? }
 
puts "The following value should be 110: #{DB.value}"

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.