Created
May 24, 2017 20:08
-
-
Save seki/648ad29eb71c73858141c1416f9e0cc9 to your computer and use it in GitHub Desktop.
Fibered TupleSpace
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# frozen_string_literal: false | |
require 'rinda/tuplespace' | |
require 'bartender/bartender' | |
module Bartender | |
class Queue | |
def initialize | |
@reader = [] | |
@queue = [] | |
end | |
def push(it) | |
if @reader.empty? | |
@queue << it | |
else | |
@reader.shift.call(it) | |
end | |
end | |
def pop | |
if @queue.empty? | |
@reader << Fiber.current.method(:resume) | |
Fiber.yield | |
else | |
@queue.shift | |
end | |
end | |
end | |
end | |
module Rinda | |
class Event | |
def initialize | |
@waiter = [] | |
end | |
def wait | |
@waiter << Fiber.current.method(:resume) | |
Fiber.yield | |
end | |
def signal | |
return if @waiter.empty? | |
@waiter.shift.call | |
end | |
end | |
class NotifyTemplateEntry < TemplateEntry | |
def initialize(place, event, tuple, expires=nil) | |
ary = [event, Rinda::Template.new(tuple)] | |
super(ary, expires) | |
@queue = Bartender::Queue.new | |
@done = false | |
end | |
end | |
class TupleSpaceMono < TupleSpace | |
def initialize(period=5) | |
@bag = TupleBag.new | |
@read_waiter = TupleBag.new | |
@take_waiter = TupleBag.new | |
@notify_waiter = TupleBag.new | |
@period = period | |
@keeper = nil | |
end | |
def new_cond | |
Event.new | |
end | |
def synchronize | |
yield | |
end | |
private | |
def start_keeper | |
return if @keeper && @keeper.alive? | |
@keeper = Fiber.new do | |
while true | |
Bartender::sleep(@period) | |
synchronize do | |
break unless need_keeper? | |
keep_clean | |
end | |
end | |
end | |
@keeper.resume | |
end | |
end | |
end | |
if __FILE__ == $0 | |
ts = Rinda::TupleSpaceMono.new | |
p ts.instance_variable_get(:@mon_mutex) | |
Fiber.new do | |
while t = ts.take([:req, nil, nil], 10) | |
sleep(rand(2.3)) | |
ts.write([:ans, t[1] + t[2]]) | |
end | |
end.resume | |
Fiber.new do | |
while true | |
p ts.take([:ans, nil]) | |
end | |
end.resume | |
Fiber.new do | |
notify = ts.notify('write', [:ans, nil]) | |
notify.each do |x| | |
p x | |
end | |
end.resume | |
Fiber.new do | |
reader = Bartender::Reader.new($stdin) | |
while true | |
a = reader.readln.chomp! | |
b = reader.readln.chomp! | |
ts.write([:req, a, b]) | |
end | |
end.resume | |
Bartender.run | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment