Created
April 11, 2010 19:46
-
-
Save Ahnfelt/363010 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# This is an implementation of Software Transactional Memory. | |
# The idea is that when a variable is first used in an atomic block, | |
# we save a snapshot of its current value and version. | |
# When we reach the end of the transaction, we lock all variables | |
# that have been read or written to inside the atomic block. If any | |
# of the variables we read has been changed from the outside, we | |
# reset the transaction and rerun it from the beginning. Otherwise | |
# we write the new values back into the shared memory with a new | |
# version. For each variable, we maintain information on what other | |
# variables have been updated to what versions at the same time, | |
# which means we can roll back if we read an inconsistent state. | |
# Although commits would roll such transactions back anyway, we | |
# might end up triggering an infinite loop or an exception before | |
# the actual commit if we read inconsistent state at any point. | |
# NOTES: | |
# - It only locks the variables that have been read or written to. | |
# - It only locks at the end of a transaction (not during). | |
# - Any exception other than RetryException that escapes the atomic | |
# block causes a clean rollback of the transaction. | |
# - Don't catch RetryException or you'll break the transaction. | |
# - It uses Thread.current[:stm_transaction] so leave that alone. | |
require 'thread' | |
require 'weakref' | |
class RetryException < Exception | |
end | |
class Impact | |
attr_accessor :version, :variable | |
def initialize(version, variable) | |
@version = version | |
@variable = WeakRef.new(variable) | |
end | |
end | |
class State | |
attr_accessor :version, :value, :impact | |
def initialize(version, value, impact) | |
@version = version | |
@value = value | |
@impact = impact | |
end | |
end | |
class Variable | |
attr_accessor :state, :mutex, :id | |
def initialize(value) | |
@id = self.object_id | |
@mutex = Mutex.new | |
@state = State.new(0, value, {id => Impact.new(0, self)}) | |
end | |
end | |
class Atomic | |
def initialize(map, block) | |
# If we're in a subtransaction, life is simpler | |
if Thread.current[:stm_transaction] != nil then | |
return Thread.current[:stm_transaction].subtransaction(map, block) | |
end | |
# Ensure that the transaction is only used from one thread | |
begin | |
@thread = Thread.current | |
Thread.current[:stm_transaction] = self | |
while true do | |
# If we're not done yet, restart the transaction | |
@variables = {} | |
@versions = {} | |
@values = {} | |
@writes = {} | |
@reads = {} | |
@retry = false | |
begin | |
map[:result] = block.call | |
rescue RetryException | |
@retry = true | |
end | |
if not @retry then | |
# If there were no writes, we're safe and done | |
if @writes.empty? then return end | |
# Sort the variables before locking in order to avoid deadlocks | |
sorted = @variables.values.sort {|a, b| a.id <=> b.id } | |
acquired = [] | |
begin | |
sorted.each {|variable| | |
variable.mutex.lock() | |
acquired.push(variable) | |
} | |
safe = true | |
# Check that none of the variables we depend on have new versions | |
@reads.each {|id, variable| | |
if variable.state.version != @versions[id] then | |
safe = false | |
break | |
end | |
} | |
# Commit the transaction | |
if safe then | |
new_impact = {} | |
@writes.each {|id, variable| | |
new_impact[id] = Impact.new(variable.state.version + 1, variable) | |
} | |
# The is probably has O(n*m) runtime or worse where n is the number of | |
# updated variables and m is the maximum number of live variables that | |
# have ever been updated together with a variable from this transaction. | |
@writes.each {|id, variable| | |
# Note that we need the actual state and not the stored state, | |
# since it is perfectly legal to overwrite a variable that hasn't | |
# been read from during the transaction | |
variable.state.impact.each {|id, impact| | |
if not new_impact.has_key?(id) and impact.variable.weakref_alive? then | |
new_impact[id] = impact | |
end | |
} | |
variable.state = State.new(variable.state.version + 1, @values[id], new_impact) | |
} | |
return | |
end | |
ensure | |
acquired.each {|variable| | |
variable.mutex.unlock() | |
} | |
end | |
end | |
end | |
ensure | |
# This transaction becomes invalid when we leave (so no threads can use it) | |
@thread = nil | |
Thread.current[:stm_transaction] = nil | |
end | |
end | |
def subtransaction(map, block) | |
def reset() | |
@variables = @backup[:variables] | |
@versions = @backup[:versions] | |
@values = @backup[:values] | |
@writes = @backup[:writes] | |
@reads = @backup[:reads] | |
end | |
def copy(hash) | |
Hash.new.update(hash) | |
end | |
begin | |
while true do | |
@backup = {} | |
@backup[:variables] = copy(@variables) | |
@backup[:versions] = copy(@versions) | |
@backup[:values] = copy(@values) | |
@backup[:writes] = copy(@writes) | |
@backup[:reads] = copy(@reads) | |
begin | |
map[:result] = block.call | |
@backup = nil | |
return | |
rescue RetryException | |
reset() | |
end | |
end | |
ensure | |
if @backup != nil then reset() end | |
end | |
end | |
def read(variable) | |
if @thread != Thread.current then raise ThreadError end | |
# If we already wrote to the variable, it's not a new dependency | |
if not @versions.has_key?(variable.id) then | |
touch(variable, true) | |
@reads[variable.id] = variable | |
end | |
return @values[variable.id] | |
end | |
def write(variable, value) | |
if @thread != Thread.current then raise ThreadError end | |
# Writes must always be recorded | |
if not @writes.has_key?(variable.id) then | |
if not @versions.has_key?(variable.id) then | |
touch(variable, false) | |
end | |
@writes[variable.id] = variable | |
end | |
@values[variable.id] = value | |
end | |
def touch(variable, read) | |
# Ensure that we use the same state for version and value | |
# The assumption here is that reading variable.state never returns | |
# a value that it didn't have at some point (which it could if the | |
# scheduler and code generator was really, really strange). | |
state = variable.state | |
@versions[variable.id] = state.version | |
@values[variable.id] = state.value | |
# Ensure that the read state isn't inconsistent | |
if read then | |
@reads.each_key {|id| | |
if state.impact.has_key?(id) and state.impact[id].version > @versions[id] then | |
raise RetryException | |
end | |
} | |
end | |
end | |
end | |
class Wrapper | |
def initialize(variable) | |
@variable = variable | |
end | |
def value | |
t = Thread.current[:stm_transaction] | |
t.read(@variable) | |
end | |
def value=(v) | |
t = Thread.current[:stm_transaction] | |
t.write(@variable, v) | |
end | |
end | |
# The only API that should be exposed is the Wrapper methods and the following few functions: | |
def variable(value) | |
Wrapper.new(Variable.new(value)) | |
end | |
def atomic(&block) | |
map = {} | |
Atomic.new(map, block) | |
return map[:result] | |
end | |
def restart() | |
raise RetryException | |
end | |
# Example code | |
# We use side effects in the atomic block to illustrate the restart of transaction A | |
# Note that the side effects are repeated. Atomic blocks with side effects (other | |
# than the manipulation of transactional variables) have no clear semantics, and it | |
# is important not to use it in production code. | |
x = variable(0) | |
y = variable(0) | |
t1 = Thread.new { | |
atomic { | |
puts("Transaction A") | |
puts(x.value) | |
x.value = 42 | |
Thread.pass | |
puts(y.value) | |
y.value = 42 | |
} | |
} | |
t2 = Thread.new { | |
atomic { | |
puts("Transaction B") | |
x.value = 1 | |
y.value = 2 | |
} | |
} | |
t1.join() | |
t2.join() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Updated to use object_id thanks to heftig on irc.freenode.net #ruby