Skip to content

Instantly share code, notes, and snippets.

@matthewd
Created Sep 18, 2019
Embed
What would you like to do?
require "singleton"
require "puma"
require "puma/plugin"
Puma::Plugin.create do
class Watchdog
class AlreadyExpired < RuntimeError; end
class Expiry < Exception; end
class Middleware
def initialize(inner)
@inner = inner
end
def call(env)
registry = Registry.instance
env["watchdog"] = watcher = Watchdog.new
registry.add(watcher)
status, headers, body = @inner.call(env)
body = Rack::BodyProxy.new(body) { registry.remove(watcher) }
watcher.extend
[status, headers, body]
ensure
registry.remove(watcher) if headers.nil?
end
end
class Registry
include Singleton
attr_reader :last_change
def initialize
@store = {}
@last_change = nil
end
def add(watcher)
@store[watcher] = nil
@last_change = [:add, watcher]
end
def remove(watcher)
@store.delete(watcher)
@last_change = [:remove, watcher]
end
def each(&block)
@store.each_key(&block)
end
end
def initialize(initial_runtime = 30, thread = Thread.current)
@thread = thread
@deadline = now + initial_runtime
@expired = false
end
def extend(runtime = 60)
check
new_deadline = now + runtime
@deadline = new_deadline if new_deadline > @deadline
end
def expired?
@expired ||= @deadline < now
end
def _remain
(@deadline - now).to_i
end
def kill
@thread.raise(Expiry, "Request watchdog expired")
end
private
def check
raise AlreadyExpired, "Request watchdog expired" if expired?
end
def now
Process.clock_gettime(Process::CLOCK_MONOTONIC)
end
end
def start(launcher)
@launcher = launcher
in_background(&method(:monitor_loop))
end
def config(config)
if config.respond_to?(:use_middleware)
config.use_middleware { |app| Watchdog::Middleware.new(app) }
end
if workers_supported?
config.on_worker_boot do
Thread.new(&method(:monitor_loop))
end
end
end
def monitor_loop
# Phase 1: Normal operation
#
# Every 10 seconds, we check that no watchers have expired.
until registry.each.any?(&:expired?)
sleep 10
end
# Phase 2: Abort due to watcher expiration
# Tell puma to shut down: it'll stop accepting new work, and wait
# for existing requests to complete.
@launcher.stop
# Wait until all known watchers (all in-flight work) are expired,
# and no other watchers are being created or destroyed. This is a
# hack around the fact we can't see the incoming request queue, and
# we want to give the still-working threads as much time as possible
# to do any draining they might be configured for.
previous_change = :unknown
until previous_change == registry.last_change &&
registry.each.all?(&:expired?)
previous_change = registry.last_change
sleep 1
end
# Phase 3: With any innocent bystanders out of the way, we can now
# use the terrible, horrible, no good, very bad Thread#raise.
# They're going to get destructively aborted somehow, so we might as
# well let them _try_ to unwind.
registry.each(&:kill)
# Give those unwinds a chance to complete. If those exceptions make
# it through, the bad requests should be cleared and Puma's
# still-pending shutdown will kill the process before we wake up.
sleep 15
# Phase 4: Anything still running now is thoroughly overdue, and
# therefore unlikely to ever finish by itself. Time to be more
# aggressive: tell Puma to go for an immediate halt.
@launcher.halt
# That should definitely do it. Wait for the process to die.
sleep 30
# Phase Ω: Just in case something's _really_ stuck, if the halt
# fails to complete in the above unreasonably-generous time, we'll
# attempt to just take down the process ourselves.
#
# I can't think of any particular reason it would come to this, but
# it seems less unreasonable than ending this method with a quiet
# return.
exit! 1
end
private def registry
Watchdog::Registry.instance
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment