-
-
Save julik/f9e1ac1fda26ddc6fb58b024d548c866 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'thread' | |
require 'nio' | |
require 'logger' | |
# Finer hijack middleware | |
class Turbofan | |
# A wrapper for everything that has to do with a particular socket | |
class Triplet < Struct.new(:socket, :handling_fiber, :last_activated) | |
def resume!(at) | |
self.last_activated = at | |
self.handling_fiber.resume(self.socket) | |
end | |
def discard! | |
# Let the fiber go out of scope deliberately | |
self.handling_fiber = nil | |
# and same with the socket | |
self.socket.close rescue nil | |
self.socket = nil | |
end | |
# Needed so that the Triplet can remove itself from a Hash or Set | |
def hash | |
self.socket.hash | |
end | |
def dormant?(at) | |
(at - self.last_activated) > SOCKET_TIMEOUT_SECONDS | |
end | |
end | |
DEFAULT_LOGGER = Logger.new($stderr) | |
DEFAULT_LOGGER.level = Logger::WARN | |
# Evict sockets that did not accept writes for longer than this time | |
SOCKET_TIMEOUT_SECONDS = 30 | |
# Staging area for hijack procs we receive from other threads | |
STAGING_QUEUE = Queue.new | |
# Speaking of which - there is exactly 0 reasons this could not | |
# be an array of handler threads, as a selector is always encased | |
# in one thread. They can all be consumers on the same queue. | |
HANDLER_T = Thread.new do | |
selector = NIO::Selector.new | |
active_triplets = Set.new | |
loop do | |
now = Process.clock_gettime(Process::CLOCK_MONOTONIC) | |
# First accept any new sockets into the reactor | |
loop do | |
maybe_socket_and_proc = STAGING_QUEUE.pop(_nonblock=true) rescue nil | |
break unless maybe_socket_and_proc | |
DEFAULT_LOGGER.info { "Turbofan accepting a socket for work #{maybe_socket_and_proc.inspect}" } | |
actual_socket, actual_proc = maybe_socket_and_proc | |
# Create a Fiber that handles this particular socket | |
handler_fiber = Fiber.new { |socket_once_writable| actual_proc.call(socket_once_writable) } | |
# and wrap the socket and the fiber into one unit of data which will also keep track | |
# of when did it last become writable | |
tri = Triplet.new(actual_socket, handler_fiber, now) | |
active_triplets << tri | |
# Register the socket with the selector, and set the triplet as the value of the monitor | |
# so that we can recover it from a select() | |
monitor = selector.register(actual_socket, :w) | |
monitor.value = tri | |
end | |
# Then select on the selector, and for all selected IOs that became writable... | |
kill_list = [] | |
did_dispatch_n = selector.select(0.25) do |ready_monitor| | |
tri = ready_monitor.value | |
begin | |
# ...resume each one of them | |
DEFAULT_LOGGER.debug { "Turbofan: resuming #{tri}" } | |
tri.resume!(_at=now) | |
rescue FiberError # dead fiber, assume hijack proc terminated normally | |
DEFAULT_LOGGER.debug { "Turbofan: #{tri} fiber terminated" } | |
kill_list << tri | |
rescue => e | |
DEFAULT_LOGGER.warn { "Turbofan: task #{tri} raised #{e.class} - #{e.message} and will be evicted, socket will be closed" } | |
kill_list << tri | |
end | |
end | |
# Then find out which fibers haven't been activated for a _loong_ while | |
# and forcibly evict them. Do not modify the set in-place. | |
if active_triplets.length > 0 | |
kill_list += active_triplets.select {|tri| tri.dormant?(now) } | |
end | |
# Terminate all triplets in the kill list | |
kill_list.each do |tri| | |
active_triplets.delete(tri) | |
selector.deregister(tri.socket) | |
tri.discard! | |
end | |
# Output stats | |
DEFAULT_LOGGER.warn do | |
{n_in_reactor: active_triplets.length, n_resumed: did_dispatch_n.to_i, n_evicted: kill_list.length} | |
end | |
end | |
end | |
def initialize(app) | |
@app = app | |
end | |
def call(env) | |
status, headers, body = @app.call(env) | |
return [status, headers, body] unless headers['rack.hijack'] | |
unwrapped_hijack_proc = headers.delete('rack.hijack') | |
# The actual hijack proc, which will be called by the webserver (puma) | |
# will transfer both the socket _and_ the hijack proc the upstream app | |
# has defined to the turbofan. | |
# Fibers may not transcend thread boundaries. Procs can though! | |
wrapped_hijack_proc = ->(raw_socket) { | |
STAGING_QUEUE.push([raw_socket, unwrapped_hijack_proc]) | |
} | |
headers['rack.hijack'] = wrapped_hijack_proc | |
[status, headers, body] | |
end | |
end | |
use Turbofan | |
sz = 1024 * 1024 * 5 | |
bigdata = Random.new.bytes(sz) | |
app = ->(env) { | |
takeover = ->(socket) { | |
loop do | |
written = socket.write_nonblock(bigdata, exception: false) | |
Fiber.yield | |
end | |
} | |
[200, {'Content-Length' => (sz * 200).to_s, 'rack.hijack' => takeover}, []] | |
} | |
run app |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment