Skip to content

Instantly share code, notes, and snippets.

@djellemah
Last active March 21, 2016 12:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save djellemah/3f068006775d3dd9a735 to your computer and use it in GitHub Desktop.
Save djellemah/3f068006775d3dd9a735 to your computer and use it in GitHub Desktop.
One-shot thread-safe value store with blocking semantics.
# One-shot thread-safe value store with blocking explicit semantics, aka
# Future/Promise/IVar https://en.wikipedia.org/wiki/Futures_and_promises.
#
# In a sense, it's a state machine pattern for 3 states (fresh/unset,
# valued/set and exception). Apparently this is the same set of states as the
# PromiseA+ specification. Who knew?
#
# It can be initialised with a value, or the value can be assigned later on.
#
# You use it like this:
#
# inst = OneTime.new
#
# These threads will wait
#
# Thread.new{inst.value}
# Thread.new{inst.value}
#
# now set a value, and the threads will wake up
#
# inst.value = :we_have_a_value!
#
# But if you do this instead
#
# inst.raise 'the critical server died and went to the cloud'
#
# then the threads will wake up to an exception.
#
class OneTime
# ASSUMPTION initialize will never have more than one thread per instance
# executing concurrently.
def initialize( *args )
# @mutex and @flag set in Fresh.extended hook, below.
case args.size
when 0
extend Fresh
when 1
@value = args.first
extend Valued
else
::Kernel.raise ArgumentError, "can only pass 0 or 1 arguments"
end
end
# only for extended. Will cause havoc if called at the wrong time.
def destruct_machinery
remove_instance_variable :@mutex if instance_variable_defined? :@mutex
remove_instance_variable :@flag if instance_variable_defined? :@flag
end
# only for extended. Will do unnecessary work if called at the wrong time.
def construct_machinery
@mutex = Mutex.new
@flag = ConditionVariable.new
end
# define access methods in the modules so we don't have to synchronize them
# unnecessarily. The module-switching is synchronized anyway, so that
# approach is still thread-safe.
# ASSUMPTION extending an instance with a module is threadsafe.
# has a value, so no need for synchronisation, and it can't be set again.
module Valued
attr_reader :value
def assigned?; true end
# would be nice to alias these, but can't because raise_assigned
# is defined in OneTime not in this module.
def value=(_) raise_assigned end
def raise(*_) raise_assigned end
def self.extended( base ); base.destruct_machinery end
end
# has an exception, so no need for synchronisation, and it can't be set again.
module Exceptional
def value; Kernel::raise @value; end
def assigned?; true end
def value=(_) raise_assigned end
def raise(*_) raise_assigned end
def self.extended( base ); base.destruct_machinery end
end
# doesn't have a value yet, so we need the synchronisation machinery.
module Fresh
def assigned?; false end
# This method is left lying around after the module switch. Which is a bit eww.
private def set_value( the_value, mod )
@mutex.synchronize do
# this happens when two threads contend to set the the_value.
raise_assigned if assigned?
@value = the_value
# Since the value won't change again, replace the synchronisation
# because it's no longer necessary. Safe to do this here, because
# other waiting threads will be waiting on @mutex.synchronize. So
# they'll just finish in their current method, and any subsequent
# calls will just use the un-synchronised accessors
flag = @flag
extend mod
freeze
# all waiters can now have access to the value they've been waiting for.
flag.broadcast
end
end
# All threads waiting on .value will wake up to the value
def value=( the_value )
set_value the_value, Valued
end
# All threads waiting on .value will wake up to an exception. Same idea
# as Thread#value.
# arguments same as Kernel.raise(string|Exception, [string, [backtrace]])
# 'raise' clashes with Kernel.raise but its worthwhile because it fits in with
# Thread, so classes using this can treat this as a promise and Thread as a future.
def raise( *args )
# parser chokes without ( ... )
set_value( (::Kernel.raise(*args) rescue $!), Exceptional )
end
# threads calling this before the value is assigned will be put to sleep
# here until there is a value. Semantics are same as Thread#value, ie will
# wait for the value, or raise an exception
#
# In the worst case a thread will wait on @flag because assigned? returns
# false, but in the meantime @value has just been set. But that's OK because
# the thread will be woken by the broadcast, which happens after the module
# switch (which also changes assigned to true).
def value
@mutex.synchronize do
@flag.wait @mutex unless assigned?
# method definition will be different by now because of module switch.
# So lean on those to return / raise as appropriate.
value
end
end
def self.extended( base )
# Do this here rather than in OneTime constructor because they're not
# used by the Valued and Exceptional states of OneTime.
# It's safe because extend is called from initialize. Which is thread-
# safe except for (possible?) interference from ObjectSpace.
base.construct_machinery
end
end
def raise_assigned(*args)
Kernel::raise ArgumentError, "value already set to #{@value.inspect}"
end
def empty?; !assigned?; end
end
require 'one_time.rb'
describe OneTime do
before :all do
@mutex = Mutex.new
end
def subject
@mutex.synchronize do
@subject ||= OneTime.new
end
end
describe '.new' do
it 'has value after' do
inst = OneTime.new :hi_there
inst.value.should == :hi_there
end
it 'fails on > 1 argument' do
->{OneTime.new :one, :two}.should raise_error(ArgumentError)
end
it 'has no value' do
subject.should_not be_assigned
subject.should be_empty
end
end
describe '#assigned?' do
it 'false when fresh' do
subject.should_not be_assigned
end
it 'true after raise' do
subject.raise 'oops'
subject.should be_assigned
end
it 'true after value=' do
subject.value = 'not oops'
subject.should be_assigned
end
end
describe '#value=' do
it 'has value after assignment' do
subject.value = :some_value
subject.value.should == :some_value
end
it 'can only assign once' do
subject.value = :some_value
rand(1..15).times do
->{subject.value = :other_value}.should raise_error(ArgumentError)
end
end
it 'assignment of exception does not raise' do
subject.value = RuntimeError.new 'ya did wrang'
->{subject.value}.should_not raise_error
end
end
describe 'threads' do
it 'threads wait for value' do
# subject is not thread-safe, so just use a local
one_time = OneTime.new
random_thread_count = rand 2..105
threads = random_thread_count.times.map{ Thread.new{one_time.value} }
# need a little wait for threads to start waiting on the value
sleep 0.00001 while threads.count{|t| t.status == 'sleep'} < 2
# set the value
one_time.value = :long_awaited
# and check that other threads woke up and got it.
threads.each do |thr|
thr.value.should == :long_awaited
thr.status.should == false
end
end
it 'all threads but one fail assignment' do
# subject is not thread-safe, so just use a local
one_time = OneTime.new
random_thread_count = rand 100..200
threads = random_thread_count.times.map do |i|
Thread.new{ one_time.value = i; :assigned_ok }
end
results = threads.map{|thr| thr.value rescue $!}
results.count{|v| v == :assigned_ok}.should == 1
results.count{|v| v.is_a? Exception}.should == random_thread_count - 1
end
it 'threads get exceptions' do
one_time = OneTime.new
threads = rand(2..105).times.map{ Thread.new{one_time.value} }
# set the value
one_time.raise RuntimeError, 'oops'
# and check that other threads woke up and got it.
threads.each do |thr|
->{thr.value}.should raise_error(RuntimeError, 'oops')
end
end
end
describe '#raise' do
it '()' do
subject.raise
->{subject.value}.should raise_error(RuntimeError,'')
end
it '(string)' do
subject.raise 'oops'
->{subject.value}.should raise_error(RuntimeError,'oops')
end
it '(exception_instance)' do
subject.raise RuntimeError.new('hereyago')
->{subject.value}.should raise_error(RuntimeError, 'hereyago')
end
it '(Exception,message)' do
subject.raise RuntimeError, 'grumpy crunchies'
->{subject.value}.should raise_error(RuntimeError, 'grumpy crunchies')
end
end
describe 'modules' do
it 'has synchronisation before assign' do
subject.method(:value).owner.should == OneTime::Fresh
subject.method(:value=).owner.should == OneTime::Fresh
end
it 'has no synchronisation after assign' do
subject.value = :some_value
subject.method(:value).owner.should == OneTime::Valued
subject.method(:value=).owner.should == OneTime::Valued
end
it 'Fresh has same methods as Valued' do
OneTime::Fresh.instance_methods.sort.should == OneTime::Valued.instance_methods.sort
OneTime::Fresh.instance_methods.each do |method_name|
fresh_method = OneTime::Fresh.instance_method method_name
valued_method = OneTime::Valued.instance_method method_name
fresh_method.arity.should == valued_method.arity
end
end
end
end
require 'one_time.rb'
require 'delegate.rb'
# Waiter-on, geddit? Hurhurhur.
#
# Use it like this:
#
# w = Waitron.new
# Thread.new{puts w + 10}
# Thread.new{puts w + 20}
#
# and then you will see nothing on stdout.
#
# w._ = 9
#
# and then you will see 1929 on stdout.
#
# This object's job is to be a very transparent delegator for a value store,
# defaulting to OneTime which is single-assigment with blocking semantics. The
# idea being that if you already have some complex algorithm in terms of ordinary
# objects, just instantiate these instead and pass them in.
#
# When they're used to wrap Thread instances, the calculation becomes automatically multi-threaded.
class Waitron < Delegator
# 0 or 1 arguments.
def initialize( *args, storage: ::OneTime)
@storage = storage
@wrapped =
case args.size
when 0
storage.new
when 1
case arg1 = args.first
when storage, ::Thread
# can wait on one of these, so just reference it.
arg1
else
# the value, which we will end up not waiting on.
storage.new arg1
end
else
raise ArgumentError, 'Waitron.new takes 0 or 1 arguments'
end
end
# I'm not sure this is a good idea.
def self.new( *args )
if args.size == 1 && (obj=args.first).is_a?(Waitron)
obj
else
super
end
end
# make several
def self.[]( *objs )
objs.map do |obj|
new obj
end
end
# make several Waitrons
def self.instances( count )
count.times.map{new}
end
# Implementation for Delegator
def __setobj__( value )
@wrapped.value = value
end
def __getobj__
@wrapped.value
end
# handle a bit of self-schizophrenia here
def nil?; __getobj__.nil? end
# In Prolog, Scala and some other languages, _ effectively means 'unbound
# variable' which kinda fits here as a nice way than __setobj__ to set the value.
module UnderscoreAccessor
def _=(*args); __assign__ *args; end
def _; __getobj__; end
end
include UnderscoreAccessor
# true if the object has a value, false otherwise. Non-blocking.
#
# I'm sure *somebody* will find a way to misuse this to avoid blocking...
def __assigned?
# this is quite horrible, but short of reopening Thread, I can't see another way to do it.
# Lots of things respond_to? :value, so can't rely on that.
case @wrapped
when ::Thread
# ie normal termination or exception
@wrapped.status == false || @wrapped.status == nil
when @storage
@wrapped.assigned?
else
raise "Don't know how to figure out assigned value for #{@wrapped.inspect}"
end
end
# be nice to pry and other REPLs
def inspect
if __assigned?
"#<Waitron:0x#{'%x' % __id__} #{__getobj__.inspect}>" rescue "#<Waitron:0x#{'%x' % __id__} #{$!.inspect}>"
else
"#<Waitron:0x#{'%x' % __id__} unset>"
end
end
# have to handle this otherwise pry (and maybe irb) will block
def pretty_print(pp)
if __assigned?
super
else
pp.text inspect
end
end
# for awesome_print
def ai; inspect end
protected
# This allows assigning to an Exception instance, which will cause accesses
# to raise. Like Thread#value.
# Will need to be overridden unless storage respond_to? :raise
def __assign__( *args )
if (ex = args.first).is_a? Exception
@wrapped.raise ex
else
__setobj__ *args
end
self
end
end
class Object
def to_waitron
Waitron.new self
end
end
class Thread
def self.waitron( *args, &block )
Waitron.new new( *args, &block )
end
end
module Kernel
def waitron( *args )
case args.size
when 0; Waitron.new
when 1; Waitron.new args.first
else; Waitron[args]
end
end
def thread(*args, &block)
Waitron.new Thread.new(*args, &block)
end
end
class Waitron
def to_waitron
self
end
end
require 'waitron.rb'
# turn off the "old syntax" warnings
RSpec.configure do |config|
config.mock_with :rspec do |c|
c.syntax = [:should, :expect]
end
config.expect_with :rspec do |c|
c.syntax = [:should, :expect]
end
end
describe Waitron do
it 'network' do
x,y,z = 3.times.map{Waitron.new}
t = Thread.new{(x + y + z) * (x - y - z)}
t.status.should == 'run'
x._ = 8
y._ = 2
z._ = 1
# this must block
t.value.should == 55
t.value.should == 55
t.status.should == false
t.value.should == 55
end
it 'concurrent updates' do
x,y,z = Waitron.instances 3
t = Thread.new{(x + y + z) * (x - y - z)}
sleep 0.001
t.status.should == 'sleep'
Thread.new{sleep rand; x._ = 8}
Thread.new{sleep rand; y._ = 2}
Thread.new{sleep rand; z._ = 1}
(t.alive? && t.stop?).should == true
# this should block
t.value.should == 55
end
it 'associative' do
x,y,z = Waitron.instances 3
x._ = 6
y._ = x
z._ = y
x.should == y
x.should == z
y.should == x
y.should == z
z.should == x
z.should == y
z.should == 6
6.should == z
end
it 'wraps a Thread' do
w = Waitron.new Thread.new{:through_value}
w.__getobj__.should == :through_value
w._.should == :through_value
end
it 'new does not wrap other waitron' do
w = Waitron.new :the_relevant_value
x = Waitron.new w
x.object_id.should == w.object_id
x.should == w
end
it 'calculation' do
future_value = Waitron.new Thread.new{sleep rand; 120034.0}
present_value = Waitron.new
projected_value = Waitron.new
interest = Waitron.new
# get percentage and multiply by projected value and interest
calc = Thread.new{(( future_value / present_value ) - 1) * projected_value * (1+interest)}
# wait a moment for thread to sleep
sleep 0.001
calc.status.should == 'sleep'
# raise exception if something goes wrong
Thread.new do
present_value._ = 100000
projected_value._ = 150000
interest._ = 0.08
end.join
calc.value.should == 32455.079999999994
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment