Skip to content

Instantly share code, notes, and snippets.

@mumoshu
Last active October 13, 2017 23:59
Show Gist options
  • Save mumoshu/8817548 to your computer and use it in GitHub Desktop.
Save mumoshu/8817548 to your computer and use it in GitHub Desktop.
goroutines in Ruby

GoroutineでもCoroutineでもないコルーチン的なもの、 「Koroutine」(造語です)を実装してみます。

KoroutineはThreadより軽量で並行動作可能な処理です。 各KoroutineやメインスレッドはChannelを通して通信します。

以下が実行例です。 スレッド 1...1 KoroutineSystem 1...2 Koroutine 1...1 Fiber の関係性で1スレッドで複数のKoroutineが実行される様子がわかります。

# Koroutine :oneが開始
Resuming for the first time
The fiber resumed at first time
# Koroutine :oneがch1待ちのため停止…したがch1に既にメッセージがあったので即座に再開
Asynchronously waiting for any message to arrive: #<Channel:0x007fde8a82bd40>
Resumed with #<Channel:0x007fde8a82bd40>
# ここで:oneから:twoへコンテキストを切り替え
# Koroutine :twoが開始
Resuming for the first time
The second fiber resumed at first time
# :twoがch2に書き込み
The second fiber wrote to ch2
The fiber should have been exited
# ここで:twoから:oneへコンテキスト切り替え
The fiber resumed with the first message: mikoto
Asynchronously waiting for any message to arrive: #<Channel:0x007fde8a82b9a8>
The fiber resumed with the second message: kuroko
The fiber should have been exited with value: #<ConditionVariable:0x007fde8a82b480>
mikotoxkuroko
require 'thread'
require 'monitor'
# threads block, fibers yield
# channel notifies a thread to resume the fiber
Thread.abort_on_exception = true
class Channel
include MonitorMixin
def initialize
super()
@mailbox = Queue.new
@listeners = Queue.new
end
def write(message)
synchronize do
if @listeners.empty?
@mailbox.push message
else
while listener = @listeners.pop
listener.call message
end
end
end
end
def read
puts "Asynchronously waiting for any message to arrive: #{self}"
Fiber.yield self
end
def await
@mailbox.pop
end
def on_read(&block)
synchronize do
if @mailbox.size > 0
block.call @mailbox.pop
else
@listeners.push block
end
end
end
end
class ThreadExecutor
def initialize
@queue = Queue.new
@thread = Thread.new do
loop do
task = @queue.pop
task.call
end
end
end
def submit(&block)
@queue.push block
end
end
class Initialize
def initialize(args)
@koroutine = args[:koroutine]
@executor = args[:executor]
end
def run
@executor.submit do
StartFiber.new(fiber: @koroutine.to_fiber, executor: @executor).run
end
end
end
class StartFiber
def initialize(args)
@fiber = args[:fiber]
@executor = args[:executor]
raise "Unexpected class of fiber: #{@fiber.class}" unless @fiber.is_a? Fiber
end
def run
@executor.submit do
puts "Resuming for the first time"
channel = @fiber.resume
if channel.is_a? Channel
puts "Resumed with #{channel}"
ResumeFiber.new(fiber: @fiber, channel: channel, executor: @executor).run
elsif (channel.is_a? ConditionVariable) || channel.nil?
puts "The fiber should have been exited"
else
raise "Unexpected class of channel: #{channel.class}"
end
end
end
end
class ResumeFiber
def initialize(args)
@fiber = args[:fiber]
@channel = args[:channel]
@executor = args[:executor]
end
def run
@channel.on_read do |message|
@executor.submit do
channel = @fiber.resume message
if channel.is_a? Channel
@executor.submit do
ResumeFiber.new(fiber: @fiber, channel: channel, executor: @executor).run
end
elsif (channel.is_a? ConditionVariable) || channel.nil?
puts "The fiber should have been exited with value: #{channel}"
else
raise "Unexpected class of channel: #{channel.class}"
end
end
end
end
end
class KoroutineSystem
def initialize
@executor = ThreadExecutor.new
end
def ko(name, &block)
koroutine = Koroutine.new(name: name, &block)
@executor.submit do
Initialize.new(koroutine: koroutine, executor: @executor).run
end
end
end
class Koroutine
def initialize(args, &block)
@name = args[:name]
@block = block
end
def to_fiber
Fiber.new &@block
end
end
sys = KoroutineSystem.new
ch1 = Channel.new
ch2 = Channel.new
ch3 = Channel.new
ch1.write "mikoto"
sys.ko :one do
puts "The fiber resumed at first time"
msg1 = ch1.read
puts "The fiber resumed with the first message: #{msg1}"
msg2 = ch2.read
puts "The fiber resumed with the second message: #{msg2}"
ch3.write "#{msg1}x#{msg2}"
end
sys.ko :two do
puts "The second fiber resumed at first time"
ch2.write "kuroko"
puts "The second fiber wrote to ch2"
end
puts ch3.await
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment