Skip to content

Instantly share code, notes, and snippets.

@jscheid
Created November 14, 2024 11:17
Show Gist options
  • Save jscheid/ca0247877eb6f874ea84a1369e07baaa to your computer and use it in GitHub Desktop.
Save jscheid/ca0247877eb6f874ea84a1369e07baaa to your computer and use it in GitHub Desktop.
INTERRUPT = Interrupt.new.freeze
WAITER =
lambda do |task, io, events, result, waiters|
result[io] = io.wait(events, nil) or return
waiters.each do |waiter|
if waiter != task && waiter.alive?
Fiber.scheduler.raise(waiter.fiber, INTERRUPT)
end
end
rescue Interrupt
# ignore
end
def io_select_inner(by_io, end_time)
enum = by_io.to_enum
first_io, first_events = (enum.next or return)
current = Async::Task.current
waiters = [current]
result = {}
loop do
io, events = enum.next
waiters << current.async(io, events, result, waiters, &WAITER)
end
first_ready =
begin
first_io.wait(first_events, ([end_time - Time.now, 0].max if end_time))
rescue Interrupt
# ignore
end
result[first_io] = first_ready if first_ready
waiters[1..].each do |waiter|
Fiber.scheduler.raise(waiter.fiber, INTERRUPT) if waiter.alive?
end
result
end
def transpose_ios(by_io, ios, event)
ios.each do |io|
if by_io.key?(io)
by_io[io] |= event
else
by_io[io] = event
end
end
end
def io_select(readable, writeable, exceptable, timeout)
end_time = Time.now + timeout if timeout
result =
io_select_inner(
{}.tap do |by_io|
transpose_ios(by_io, readable, IO::READABLE)
transpose_ios(by_io, writeable, IO::WRITABLE)
transpose_ios(by_io, exceptable, IO::PRIORITY)
end,
end_time,
)
return if result.empty?
r_ready = []
w_ready = []
e_ready = []
result.each do |io, ready|
r_ready << io if (ready & IO::READABLE).nonzero?
w_ready << io if (ready & IO::WRITABLE).nonzero?
e_ready << io if (ready & IO::PRIORITY).nonzero?
end
[r_ready, w_ready, e_ready]
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment