Skip to content

Instantly share code, notes, and snippets.

@seki

seki/channel_space.rb

Last active Aug 29, 2015
Embed
What would you like to do?
require 'rinda/tuplespace'
class ChannelSpace
include DRbUndumped
class ChannelError < RuntimeError
def initialize(str, handle)
@channel = handle
super(str)
end
attr_reader :channel
end
class Readable
def ===(it)
it.readable?
end
end
class Any
def initialize(ary)
@ary = ary
end
def ===(it)
@ary.include?(it)
end
end
class Channel
def initialize(ts, size, handle)
@ts = ts
@handle = handle
@reader = []
@writer = []
@buf = []
@size = size
@closed = false
end
def readable?
_readable? || @closed
end
def _readable?
@buf.size + @writer.size > 0
end
def close
@closed = true
while writer = @writer.shift
@ts.write([:write, @handle, writer[1], self])
end
while reader = @reader.shift
@ts.write([:read, @handle, reader, self])
end
end
def writer_shift
return if @writer.empty?
value, key = @writer.shift
@buf << value
@ts.write([:write, @handle, key, value])
end
def buf_push(value)
@writer << [value, Thread.current]
if @buf.size < @size
writer_shift
end
end
def buf_shift
writer_shift
@buf.shift
end
def req_read
if _readable?
@ts.write([:read, @handle, Thread.current, buf_shift])
else
raise ChannelError.new('closed channel.', @handle) if @closed
@reader << Thread.current
end
end
def req_write(value)
raise ChannelError.new('closed channel.', @handle) if @closed
buf_push(value)
return if @reader.empty?
@ts.write([:read, @handle, @reader.shift, buf_shift])
end
end
class Handle
include DRbUndumped
def initialize(cs)
@cs = cs
end
def write(value)
@cs.write(self, value)
end
def read
@cs.read(self)
end
def close
@cs.close(self)
end
end
def initialize
@ts = Rinda::TupleSpace.new
@readable = Readable.new
end
def open(size=0)
handle = Handle.new(self)
@ts.write([handle, Channel.new(@ts, size, handle)])
handle
end
def close(handle)
begin
_, chan = @ts.take([handle, nil])
chan.close
ensure
@ts.write([handle, chan])
end
end
def write(handle, value)
begin
_, chan = @ts.take([handle, nil])
chan.req_write(value)
ensure
@ts.write([handle, chan])
end
_, _, _, value = @ts.take([:write, handle, Thread.current, nil])
raise ChannelError.new('closed channel.', handle) if value == chan
value
end
def read(handle)
begin
_, chan = @ts.take([handle, nil])
chan.req_read
ensure
@ts.write([handle, chan])
end
_, _, _, value = @ts.take([:read, handle, Thread.current, nil])
raise ChannelError.new('closed channel.', handle) if value == chan
value
end
def select(set)
handle, _ = @ts.read([Any.new(set), @readable])
handle
end
def select_and_read(set)
begin
handle, chan = @ts.take([Any.new(set), @readable])
chan.req_read
ensure
@ts.write([handle, chan])
end
_, _, _, value = @ts.take([:read, handle, Thread.current, nil])
raise ChannelError.new('closed channel.', handle) if value == chan
return handle, value
end
def after(sec, value=nil)
chan = open
Thread.new {sleep sec; chan.write(value)}
chan
end
@cs = self.new
def self.open(size=0)
@cs.open(size)
end
def self.after(sec, value=nil)
@cs.after(sec, value)
end
def self.select(set)
@cs.select(set)
end
def self.select_and_read(set)
@cs.select_and_read(set)
end
end
if __FILE__ == $0
def f(chan, n)
sleep(rand * 10)
p [:f_begin, n]
chan.write([:f, n])
p [:f_end, n]
chan.close
end
ary = (0..3).collect do |n|
chan = ChannelSpace.open(0)
Thread.new(n) {|x| loop {f(chan, x)}}
chan
end
timeout = ChannelSpace.after(10, :timeout)
ary << timeout
loop do
chan = ChannelSpace.select(ary)
begin
p chan.read
rescue ChannelSpace::ChannelError
p :closed
ary.delete(chan)
end
break if chan == timeout
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment