Skip to content

Instantly share code, notes, and snippets.

@seki
Last active August 29, 2015 14:02
Show Gist options
  • Save seki/4334968b9444e0b71af1 to your computer and use it in GitHub Desktop.
Save seki/4334968b9444e0b71af1 to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
require 'rinda/tuplespace'
module Channel
class ChannelSpace
class Any
def initialize(ary)
@ary = ary
end
def ===(ot)
@ary.include?(ot)
end
end
class Handle
def initialize(cs)
@cs = cs
end
def write(value)
@cs.write(self, value)
end
def read
@cs.read(self)
end
end
def initialize
@ts = Rinda::TupleSpace.new
end
def open(size=0)
chan = Handle.new(self)
@ts.write([:stream, chan, []])
size.times do
@ts.write([:writable, chan])
end
chan
end
def write(chan, value)
@ts.write([:wait_writable, chan])
@ts.take([:writable, chan])
@ts.take([:wait_writable, chan])
_, _, ary = @ts.take([:stream, chan, nil])
ary.push(value)
value
ensure
@ts.write([:stream, chan, ary])
@ts.write([:readable, chan])
end
def read(chan)
@ts.write([:writable, chan])
@ts.take([:readable, chan])
_, _, ary = @ts.take([:stream, chan, nil])
ary.shift
ensure
@ts.write([:stream, chan, ary])
end
def select(chan_set)
_, chan = @ts.read([Any.new([:readable, :wait_writable]),
Any.new(chan_set)])
chan
end
def after(sec, value=nil)
chan = open
Thread.new {sleep sec; chan.write(value)}
chan
end
end
end
cs = Channel::ChannelSpace.new
def f(chan, n)
sleep(rand * 10)
p [:f_begin, n]
chan.write([:f, n])
p [:f_end, n]
end
ary = (0..3).collect do |n|
chan = cs.open
Thread.new(n) {|n| f(chan, n)}
chan
end
timeout = cs.after(5, :timeout)
ary << timeout
ary.size.times do
chan = cs.select(ary)
p chan.read
break if chan == timeout
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment