Skip to content

Instantly share code, notes, and snippets.

@johan--
Forked from djellemah/queue_multiplex.rb
Last active August 29, 2015 14:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save johan--/1555f1a45c5d2bb4c6a3 to your computer and use it in GitHub Desktop.
Save johan--/1555f1a45c5d2bb4c6a3 to your computer and use it in GitHub Desktop.
require 'thwait'
# Synchronous hand-off between two threads.
# Both threads will block until a transfer is successful.
class Rendevous
def initialize
@mutex = Mutex.new
@send_flag = ConditionVariable.new
@recv_flag = ConditionVariable.new
@container = []
end
def value
@mutex.synchronize do
while @container.empty?
::Kernel.raise @exception if instance_variable_defined? :@exception
@recv_flag.wait @mutex
end
# can do this because the signal will only be acted after the mutex is
# released, which is only after @container.pop
@send_flag.signal
@container.pop
end
end
def value=( rhs )
@mutex.synchronize do
while !@container.empty?
@send_flag.wait @mutex
end
::Kernel.raise @exception if instance_variable_defined? :@exception
@container << rhs
@recv_flag.signal
end
end
def raise( *args )
@mutex.synchronize do
@exception = (::Kernel.raise *args rescue $!)
@recv_flag.broadcast
end
end
end
# maybe select is the wrong name, seeing as in posix, select is non-blocking
def Queue.select(*qs, &receiver)
return enum_for(__method__, *qs) unless block_given?
xchg = Rendevous.new
consumer_threads = qs.map do |q|
Thread.new do
while item = q.pop
xchg.value = item, q
end
end
end
# all queues finished, so wake threads waiting on xchg
Thread.new do
ThreadsWait.all_waits *consumer_threads
xchg.raise StopIteration
end
# only need to make this decision once
to_receiver =
case receiver.arity
when 1; ->(item, _){receiver.call item}
when -1, 2; ->(item, q){receiver.call item, q}
else
raise "unknown arity for receiver #{receiver.arity}"
end
loop do
item, q = xchg.value
to_receiver.call item, q
end
end
def exercise
q1 = SizedQueue.new 9
q2 = SizedQueue.new 3
q3 = SizedQueue.new 11
prod1 = Thread.new{12.times{|i| item = "q1 << #{i}"; q1 << item ; i += 1}; q1 << nil}
prod2 = Thread.new{15.times{|i| item = "q2 << #{i}"; q2 << item ; i += 1}; q2 << nil}
prod3 = Thread.new{7.times{|i| item = "q3 << #{i}"; q3 << item ; i += 1}; q3 << nil}
Queue.select(q1,q2,q3).each_with_index{|(item,q),index| puts [index,item,q].inspect}
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment