Skip to content

Instantly share code, notes, and snippets.

@Ahnfelt
Created April 11, 2010 19:46
Show Gist options
  • Save Ahnfelt/363010 to your computer and use it in GitHub Desktop.
Save Ahnfelt/363010 to your computer and use it in GitHub Desktop.
# This is an implementation of Software Transactional Memory.
# The idea is that when a variable is first used in an atomic block,
# we save a snapshot of its current value and version.
# When we reach the end of the transaction, we lock all variables
# that have been read or written to inside the atomic block. If any
# of the variables we read has been changed from the outside, we
# reset the transaction and rerun it from the beginning. Otherwise
# we write the new values back into the shared memory with a new
# version. For each variable, we maintain information on what other
# variables have been updated to what versions at the same time,
# which means we can roll back if we read an inconsistent state.
# Although commits would roll such transactions back anyway, we
# might end up triggering an infinite loop or an exception before
# the actual commit if we read inconsistent state at any point.
# NOTES:
# - It only locks the variables that have been read or written to.
# - It only locks at the end of a transaction (not during).
# - Any exception other than RetryException that escapes the atomic
# block causes a clean rollback of the transaction.
# - Don't catch RetryException or you'll break the transaction.
# - It uses Thread.current[:stm_transaction] so leave that alone.
require 'thread'
require 'weakref'
class RetryException < Exception
end
class Impact
attr_accessor :version, :variable
def initialize(version, variable)
@version = version
@variable = WeakRef.new(variable)
end
end
class State
attr_accessor :version, :value, :impact
def initialize(version, value, impact)
@version = version
@value = value
@impact = impact
end
end
class Variable
attr_accessor :state, :mutex, :id
def initialize(value)
@id = self.object_id
@mutex = Mutex.new
@state = State.new(0, value, {id => Impact.new(0, self)})
end
end
class Atomic
def initialize(map, block)
# If we're in a subtransaction, life is simpler
if Thread.current[:stm_transaction] != nil then
return Thread.current[:stm_transaction].subtransaction(map, block)
end
# Ensure that the transaction is only used from one thread
begin
@thread = Thread.current
Thread.current[:stm_transaction] = self
while true do
# If we're not done yet, restart the transaction
@variables = {}
@versions = {}
@values = {}
@writes = {}
@reads = {}
@retry = false
begin
map[:result] = block.call
rescue RetryException
@retry = true
end
if not @retry then
# If there were no writes, we're safe and done
if @writes.empty? then return end
# Sort the variables before locking in order to avoid deadlocks
sorted = @variables.values.sort {|a, b| a.id <=> b.id }
acquired = []
begin
sorted.each {|variable|
variable.mutex.lock()
acquired.push(variable)
}
safe = true
# Check that none of the variables we depend on have new versions
@reads.each {|id, variable|
if variable.state.version != @versions[id] then
safe = false
break
end
}
# Commit the transaction
if safe then
new_impact = {}
@writes.each {|id, variable|
new_impact[id] = Impact.new(variable.state.version + 1, variable)
}
# The is probably has O(n*m) runtime or worse where n is the number of
# updated variables and m is the maximum number of live variables that
# have ever been updated together with a variable from this transaction.
@writes.each {|id, variable|
# Note that we need the actual state and not the stored state,
# since it is perfectly legal to overwrite a variable that hasn't
# been read from during the transaction
variable.state.impact.each {|id, impact|
if not new_impact.has_key?(id) and impact.variable.weakref_alive? then
new_impact[id] = impact
end
}
variable.state = State.new(variable.state.version + 1, @values[id], new_impact)
}
return
end
ensure
acquired.each {|variable|
variable.mutex.unlock()
}
end
end
end
ensure
# This transaction becomes invalid when we leave (so no threads can use it)
@thread = nil
Thread.current[:stm_transaction] = nil
end
end
def subtransaction(map, block)
def reset()
@variables = @backup[:variables]
@versions = @backup[:versions]
@values = @backup[:values]
@writes = @backup[:writes]
@reads = @backup[:reads]
end
def copy(hash)
Hash.new.update(hash)
end
begin
while true do
@backup = {}
@backup[:variables] = copy(@variables)
@backup[:versions] = copy(@versions)
@backup[:values] = copy(@values)
@backup[:writes] = copy(@writes)
@backup[:reads] = copy(@reads)
begin
map[:result] = block.call
@backup = nil
return
rescue RetryException
reset()
end
end
ensure
if @backup != nil then reset() end
end
end
def read(variable)
if @thread != Thread.current then raise ThreadError end
# If we already wrote to the variable, it's not a new dependency
if not @versions.has_key?(variable.id) then
touch(variable, true)
@reads[variable.id] = variable
end
return @values[variable.id]
end
def write(variable, value)
if @thread != Thread.current then raise ThreadError end
# Writes must always be recorded
if not @writes.has_key?(variable.id) then
if not @versions.has_key?(variable.id) then
touch(variable, false)
end
@writes[variable.id] = variable
end
@values[variable.id] = value
end
def touch(variable, read)
# Ensure that we use the same state for version and value
# The assumption here is that reading variable.state never returns
# a value that it didn't have at some point (which it could if the
# scheduler and code generator was really, really strange).
state = variable.state
@versions[variable.id] = state.version
@values[variable.id] = state.value
# Ensure that the read state isn't inconsistent
if read then
@reads.each_key {|id|
if state.impact.has_key?(id) and state.impact[id].version > @versions[id] then
raise RetryException
end
}
end
end
end
class Wrapper
def initialize(variable)
@variable = variable
end
def value
t = Thread.current[:stm_transaction]
t.read(@variable)
end
def value=(v)
t = Thread.current[:stm_transaction]
t.write(@variable, v)
end
end
# The only API that should be exposed is the Wrapper methods and the following few functions:
def variable(value)
Wrapper.new(Variable.new(value))
end
def atomic(&block)
map = {}
Atomic.new(map, block)
return map[:result]
end
def restart()
raise RetryException
end
# Example code
# We use side effects in the atomic block to illustrate the restart of transaction A
# Note that the side effects are repeated. Atomic blocks with side effects (other
# than the manipulation of transactional variables) have no clear semantics, and it
# is important not to use it in production code.
x = variable(0)
y = variable(0)
t1 = Thread.new {
atomic {
puts("Transaction A")
puts(x.value)
x.value = 42
Thread.pass
puts(y.value)
y.value = 42
}
}
t2 = Thread.new {
atomic {
puts("Transaction B")
x.value = 1
y.value = 2
}
}
t1.join()
t2.join()
@Ahnfelt
Copy link
Author

Ahnfelt commented Apr 11, 2010

Updated to use object_id thanks to heftig on irc.freenode.net #ruby

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment