Skip to content

Instantly share code, notes, and snippets.

@julik
Last active January 18, 2019 02:59
Show Gist options
  • Save julik/f9e1ac1fda26ddc6fb58b024d548c866 to your computer and use it in GitHub Desktop.
Save julik/f9e1ac1fda26ddc6fb58b024d548c866 to your computer and use it in GitHub Desktop.
require 'thread'
require 'nio'
require 'logger'
# Finer hijack middleware
class Turbofan
# A wrapper for everything that has to do with a particular socket
class Triplet < Struct.new(:socket, :handling_fiber, :last_activated)
def resume!(at)
self.last_activated = at
self.handling_fiber.resume(self.socket)
end
def discard!
# Let the fiber go out of scope deliberately
self.handling_fiber = nil
# and same with the socket
self.socket.close rescue nil
self.socket = nil
end
# Needed so that the Triplet can remove itself from a Hash or Set
def hash
self.socket.hash
end
def dormant?(at)
(at - self.last_activated) > SOCKET_TIMEOUT_SECONDS
end
end
DEFAULT_LOGGER = Logger.new($stderr)
DEFAULT_LOGGER.level = Logger::WARN
# Evict sockets that did not accept writes for longer than this time
SOCKET_TIMEOUT_SECONDS = 30
# Staging area for hijack procs we receive from other threads
STAGING_QUEUE = Queue.new
# Speaking of which - there is exactly 0 reasons this could not
# be an array of handler threads, as a selector is always encased
# in one thread. They can all be consumers on the same queue.
HANDLER_T = Thread.new do
selector = NIO::Selector.new
active_triplets = Set.new
loop do
now = Process.clock_gettime(Process::CLOCK_MONOTONIC)
# First accept any new sockets into the reactor
loop do
maybe_socket_and_proc = STAGING_QUEUE.pop(_nonblock=true) rescue nil
break unless maybe_socket_and_proc
DEFAULT_LOGGER.info { "Turbofan accepting a socket for work #{maybe_socket_and_proc.inspect}" }
actual_socket, actual_proc = maybe_socket_and_proc
# Create a Fiber that handles this particular socket
handler_fiber = Fiber.new { |socket_once_writable| actual_proc.call(socket_once_writable) }
# and wrap the socket and the fiber into one unit of data which will also keep track
# of when did it last become writable
tri = Triplet.new(actual_socket, handler_fiber, now)
active_triplets << tri
# Register the socket with the selector, and set the triplet as the value of the monitor
# so that we can recover it from a select()
monitor = selector.register(actual_socket, :w)
monitor.value = tri
end
# Then select on the selector, and for all selected IOs that became writable...
kill_list = []
did_dispatch_n = selector.select(0.25) do |ready_monitor|
tri = ready_monitor.value
begin
# ...resume each one of them
DEFAULT_LOGGER.debug { "Turbofan: resuming #{tri}" }
tri.resume!(_at=now)
rescue FiberError # dead fiber, assume hijack proc terminated normally
DEFAULT_LOGGER.debug { "Turbofan: #{tri} fiber terminated" }
kill_list << tri
rescue => e
DEFAULT_LOGGER.warn { "Turbofan: task #{tri} raised #{e.class} - #{e.message} and will be evicted, socket will be closed" }
kill_list << tri
end
end
# Then find out which fibers haven't been activated for a _loong_ while
# and forcibly evict them. Do not modify the set in-place.
if active_triplets.length > 0
kill_list += active_triplets.select {|tri| tri.dormant?(now) }
end
# Terminate all triplets in the kill list
kill_list.each do |tri|
active_triplets.delete(tri)
selector.deregister(tri.socket)
tri.discard!
end
# Output stats
DEFAULT_LOGGER.warn do
{n_in_reactor: active_triplets.length, n_resumed: did_dispatch_n.to_i, n_evicted: kill_list.length}
end
end
end
def initialize(app)
@app = app
end
def call(env)
status, headers, body = @app.call(env)
return [status, headers, body] unless headers['rack.hijack']
unwrapped_hijack_proc = headers.delete('rack.hijack')
# The actual hijack proc, which will be called by the webserver (puma)
# will transfer both the socket _and_ the hijack proc the upstream app
# has defined to the turbofan.
# Fibers may not transcend thread boundaries. Procs can though!
wrapped_hijack_proc = ->(raw_socket) {
STAGING_QUEUE.push([raw_socket, unwrapped_hijack_proc])
}
headers['rack.hijack'] = wrapped_hijack_proc
[status, headers, body]
end
end
use Turbofan
sz = 1024 * 1024 * 5
bigdata = Random.new.bytes(sz)
app = ->(env) {
takeover = ->(socket) {
loop do
written = socket.write_nonblock(bigdata, exception: false)
Fiber.yield
end
}
[200, {'Content-Length' => (sz * 200).to_s, 'rack.hijack' => takeover}, []]
}
run app
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment