Skip to content

Instantly share code, notes, and snippets.

@ioquatix
Last active March 28, 2017 02:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ioquatix/5b1572079864031bb618bdfd6d85d8c8 to your computer and use it in GitHub Desktop.
Save ioquatix/5b1572079864031bb618bdfd6d85d8c8 to your computer and use it in GitHub Desktop.
rough outline of fiber based concurrency
#!/usr/bin/env ruby
require 'fiber'
require 'nio'
module Async
end
class Async::Await
def initialize(ios, reactor)
@ios = ios.collect{|io| Async::IO.wrap(io, reactor)}
end
attr :ios
def connect(addr, port)
raise NotImplementedError
end
def close
@ios.each(&:close)
end
end
class Async::IO
def self.wrap(io, *args)
case io
when TCPServer
return Async::TCP::Server.new(io, *args)
when TCPSocket
return Async::TCP::Socket.new(io, *args)
else
raise ArgumentError.new("Don't know how to wrap #{io.class}!")
end
end
def initialize(io, reactor)
@io = io
@reactor = reactor
@monitor = nil
end
def to_io
@io.to_io
end
def monitor(interests)
unless @monitor
@monitor = @reactor.register(to_io, interests)
@monitor.value = Fiber.current
else
@monitor.interests = interests
end
end
def wait_readable
Fiber.yield monitor(:r)
end
def wait_writable
Fiber.yield monitor(:w)
end
def wait_any
Fiber.yield monitor(:rw)
end
def method_missing(name, *args, &block)
@io.send(name, *args, &block)
end
def close
@monitor.close if @monitor
end
end
module Async::TCP
class Server < Async::IO
def accept(*args)
begin
@io.accept_nonblock(*args)
rescue IO::WaitReadable, Errno::EINTR
wait_readable
retry
end
end
end
class Socket < Async::IO
def read(*args)
begin
@io.read_nonblock(*args)
rescue IO::WaitReadable, Errno::EINTR
wait_readable
retry
end
end
def write(*args)
begin
@io.write_nonblock(*args)
rescue IO::WaitReadable, Errno::EINTR
wait_readable
retry
end
end
end
end
class Reactor
def initialize
puts "Creating selector"
@selector = NIO::Selector.new
@fibers = []
end
def async(*ios)
@fibers << Fiber.new do
await = Async::Await.new(ios, self)
begin
yield *await.ios, await
ensure
await.close
end
end
end
def register(*args)
@selector.register(*args)
end
def run_forever
while @fibers.any?
puts "Updating #{@fibers.count} fibers..."
@fibers = @fibers.select do |fiber|
fiber.resume if fiber.alive?
end
puts "Selecting with #{@fibers.count} fibers..."
while monitors = @selector.select(1)
monitors.each do |monitor|
monitor.value.resume
end
end
end
end
def accept(server)
begin
return server.accept_nonblock(exceptions: false)
rescue IO::WaitReadable, Errno::EINTR
wait.for_readable
retry
end
end
end
reactor = Reactor.new
puts "Creating server"
server = TCPServer.new("localhost", 6777)
reactor.async(server) do |server|
while true
puts "Accepting peer on server #{server}"
peer = server.accept
puts "Sending data to peer"
peer << "data\n"
peer.shutdown
end
end
10.times do |i|
puts "Creating client #{i}"
client = TCPSocket.new("localhost", 6777)
reactor.async(client) do |client|
puts "Reading data on client #{i}"
puts client.read(1024)
end
end
reactor.run_forever
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment