This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require "singleton" | |
require "puma" | |
require "puma/plugin" | |
Puma::Plugin.create do | |
class Watchdog | |
class AlreadyExpired < RuntimeError; end | |
class Expiry < Exception; end | |
class Middleware | |
def initialize(inner) | |
@inner = inner | |
end | |
def call(env) | |
registry = Registry.instance | |
env["watchdog"] = watcher = Watchdog.new | |
registry.add(watcher) | |
status, headers, body = @inner.call(env) | |
body = Rack::BodyProxy.new(body) { registry.remove(watcher) } | |
watcher.extend | |
[status, headers, body] | |
ensure | |
registry.remove(watcher) if headers.nil? | |
end | |
end | |
class Registry | |
include Singleton | |
attr_reader :last_change | |
def initialize | |
@store = {} | |
@last_change = nil | |
end | |
def add(watcher) | |
@store[watcher] = nil | |
@last_change = [:add, watcher] | |
end | |
def remove(watcher) | |
@store.delete(watcher) | |
@last_change = [:remove, watcher] | |
end | |
def each(&block) | |
@store.each_key(&block) | |
end | |
end | |
def initialize(initial_runtime = 30, thread = Thread.current) | |
@thread = thread | |
@deadline = now + initial_runtime | |
@expired = false | |
end | |
def extend(runtime = 60) | |
check | |
new_deadline = now + runtime | |
@deadline = new_deadline if new_deadline > @deadline | |
end | |
def expired? | |
@expired ||= @deadline < now | |
end | |
def _remain | |
(@deadline - now).to_i | |
end | |
def kill | |
@thread.raise(Expiry, "Request watchdog expired") | |
end | |
private | |
def check | |
raise AlreadyExpired, "Request watchdog expired" if expired? | |
end | |
def now | |
Process.clock_gettime(Process::CLOCK_MONOTONIC) | |
end | |
end | |
def start(launcher) | |
@launcher = launcher | |
in_background(&method(:monitor_loop)) | |
end | |
def config(config) | |
if config.respond_to?(:use_middleware) | |
config.use_middleware { |app| Watchdog::Middleware.new(app) } | |
end | |
if workers_supported? | |
config.on_worker_boot do | |
Thread.new(&method(:monitor_loop)) | |
end | |
end | |
end | |
def monitor_loop | |
# Phase 1: Normal operation | |
# | |
# Every 10 seconds, we check that no watchers have expired. | |
until registry.each.any?(&:expired?) | |
sleep 10 | |
end | |
# Phase 2: Abort due to watcher expiration | |
# Tell puma to shut down: it'll stop accepting new work, and wait | |
# for existing requests to complete. | |
@launcher.stop | |
# Wait until all known watchers (all in-flight work) are expired, | |
# and no other watchers are being created or destroyed. This is a | |
# hack around the fact we can't see the incoming request queue, and | |
# we want to give the still-working threads as much time as possible | |
# to do any draining they might be configured for. | |
previous_change = :unknown | |
until previous_change == registry.last_change && | |
registry.each.all?(&:expired?) | |
previous_change = registry.last_change | |
sleep 1 | |
end | |
# Phase 3: With any innocent bystanders out of the way, we can now | |
# use the terrible, horrible, no good, very bad Thread#raise. | |
# They're going to get destructively aborted somehow, so we might as | |
# well let them _try_ to unwind. | |
registry.each(&:kill) | |
# Give those unwinds a chance to complete. If those exceptions make | |
# it through, the bad requests should be cleared and Puma's | |
# still-pending shutdown will kill the process before we wake up. | |
sleep 15 | |
# Phase 4: Anything still running now is thoroughly overdue, and | |
# therefore unlikely to ever finish by itself. Time to be more | |
# aggressive: tell Puma to go for an immediate halt. | |
@launcher.halt | |
# That should definitely do it. Wait for the process to die. | |
sleep 30 | |
# Phase Ω: Just in case something's _really_ stuck, if the halt | |
# fails to complete in the above unreasonably-generous time, we'll | |
# attempt to just take down the process ourselves. | |
# | |
# I can't think of any particular reason it would come to this, but | |
# it seems less unreasonable than ending this method with a quiet | |
# return. | |
exit! 1 | |
end | |
private def registry | |
Watchdog::Registry.instance | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment