Skip to content

Instantly share code, notes, and snippets.

@aaronjensen
Last active November 28, 2022 06:59
Show Gist options
  • Save aaronjensen/33cc2aeb74746cac3bcb40dcefdd9c09 to your computer and use it in GitHub Desktop.
Save aaronjensen/33cc2aeb74746cac3bcb40dcefdd9c09 to your computer and use it in GitHub Desktop.
Phoenix Drain Stop
# ATTENTION: This is now supported in plug_cowboy as of 2.1.0:
# https://hexdocs.pm/plug_cowboy/Plug.Cowboy.Drainer.html
defmodule DrainStop do
@moduledoc """
DrainStop Attempts to gracefully shutdown an endpoint when a normal shutdown
occurs. It first shuts down the acceptor, ensuring that no new requests can be
made. It then waits for all pending requests to complete. If the timeout
expires before this happens, it stops waiting, allowing the supervision tree
to continue its shutdown order.
DrainStop should be installed in your supervision tree *after* the
EndPoint it is going to drain stop.
DrainStop takes two options:
`endpoint`: The `Phoenix.Endpoint` to drain stop. Required. `timeout`: The
amount of time to allow for requests to finish in msec. Defaults to `5000`.
For example:
children = [
supervisor(MyApp.Endpoint, []),
worker(
DrainStop,
[[timeout: 10_000, endpoint: MyApp.Endpoint]],
[shutdown: 15_000]
)
]
"""
use GenServer
require Logger
import Supervisor, only: [which_children: 1, terminate_child: 2]
def start_link(options) do
GenServer.start_link(DrainStop, options)
end
def init(options) do
Process.flag(:trap_exit, true)
endpoint = Keyword.fetch!(options, :endpoint)
timeout = Keyword.get(options, :timeout, 5000)
{:ok, {endpoint, timeout}}
end
def terminate(:shutdown, {endpoint, timeout}), do: drain_endpoint(endpoint, timeout)
def terminate({:shutdown, _}, {endpoint, timeout}), do: drain_endpoint(endpoint, timeout)
def terminate(:normal, {endpoint, timeout}), do: drain_endpoint(endpoint, timeout)
def terminate(_, _), do: :ok
def drain_endpoint(endpoint, timeout) do
stop_listening(endpoint)
wait_for_requests(endpoint, timeout)
end
def wait_for_requests(endpoint, timeout) do
Logger.info("DrainStop starting graceful shutdown with timeout: #{timeout}")
timer_ref = :erlang.start_timer(timeout, self, :timeout)
do_wait_for_requests(endpoint, timer_ref, %{})
end
defp do_wait_for_requests(endpoint, timer_ref, refs) do
get_monitor = fn pid ->
refs[pid] || Process.monitor(pid)
end
refs =
endpoint
|> pending_requests
|> Map.new(&{&1, get_monitor.(&1)})
case Map.size(refs) do
0 ->
Logger.info("DrainStop Successful, no more connections")
:erlang.cancel_timer(timer_ref)
n ->
time_left = :erlang.read_timer(timer_ref)
Logger.info("DrainStop waiting #{time_left} msec for #{n} more connections to shutdown")
receive do
{:DOWN, _monitor_ref, _, _, _} ->
do_wait_for_requests(endpoint, timer_ref, refs)
{:timeout, ^timer_ref, :timeout} ->
Logger.error("DrainStop timeout")
msg ->
Logger.error("DrainStop unexpected msg: #{inspect msg}")
do_wait_for_requests(endpoint, timer_ref, refs)
end
end
end
def pending_requests(endpoint) do
for pid <- ranch_listener_sup_pids(endpoint),
{:ranch_conns_sup, sup_pid, _, _} <- which_children(pid),
{_, request_pid, _, _} <- which_children(sup_pid),
do: request_pid
end
def ranch_listener_sup_pids(endpoint) do
for {Phoenix.Endpoint.Server, pid, _, _} <- which_children(endpoint),
{{:ranch_listener_sup, _}, pid, _, _} <- which_children(pid),
do: pid
end
def stop_listening(endpoint) do
endpoint
|> ranch_listener_sup_pids
|> Enum.each(&terminate_child(&1, :ranch_acceptors_sup))
end
end
defmodule DrainStopTest do
use ExUnit.Case, async: false
use Phoenix.ConnTest
defmodule TestSupervisor do
@drain_stop_timeout 100
def start_link do
import Supervisor.Spec, warn: false
children = [
# Start the endpoint when the application starts
supervisor(DrainStopTest.TestEndpoint, []),
worker(
DrainStop,
[[timeout: @drain_stop_timeout, endpoint: DrainStopTest.TestEndpoint]],
[shutdown: @drain_stop_timeout * 2]
),
]
opts = [strategy: :one_for_one]
Supervisor.start_link(children, opts)
end
end
defmodule TestPlug do
def init(opts), do: opts
def call(conn, _) do
pid = Application.get_env(:drain_stop_test, :test_pid)
conn = Plug.Conn.fetch_query_params(conn)
send pid, :request_start
{time, _} = Integer.parse(conn.params["sleep"])
:timer.sleep(time)
send pid, :request_end
conn
end
end
defmodule TestEndpoint do
use Phoenix.Endpoint, otp_app: :drain_stop_test
plug DrainStopTest.TestPlug
end
@endpoint DrainStopTest.TestEndpoint
setup do
Application.put_env(:drain_stop_test, :test_pid, self)
Application.put_env(:drain_stop_test, DrainStopTest.TestEndpoint,
http: [port: "4807"], url: [host: "example.com"], server: true)
{:ok, pid} = DrainStopTest.TestSupervisor.start_link
Process.flag(:trap_exit, true)
on_exit(fn ->
if Process.whereis(DrainStopTest.TestEndpoint) do
Supervisor.stop(DrainStopTest.TestEndpoint)
end
Process.exit(pid, :brutal_kill)
end)
{:ok, pid: pid}
end
test "waits for request to finish", %{pid: pid} do
Task.async(fn ->
HTTPoison.get("http://localhost:4807/?sleep=50")
end)
assert_receive :request_start, 1000
Supervisor.stop(pid, :shutdown)
assert_received :request_end
end
test "truncates requests that don't finish in time", %{pid: pid} do
Task.async(fn ->
HTTPoison.get("http://localhost:4807/?sleep=500")
end)
assert_receive :request_start, 1000
Supervisor.stop(pid, :shutdown)
refute_received :request_end
end
test "does not allow new requests" do
# This is harder to test without reaching in to internals...
DrainStop.stop_listening(DrainStopTest.TestEndpoint)
assert_raise(HTTPoison.Error,
fn -> HTTPoison.get!("http://localhost:4807/?sleep=500") end)
end
end
@drewblas
Copy link

Thanks for this. I think it'll be a big help for us!

@mmmries
Copy link

mmmries commented Jun 25, 2016

Thanks for posting this 👍 very helpful for a use case that I have.

@xingxing
Copy link

xingxing commented Sep 18, 2016

Thanks a lot for the job! But how can I use it. I added DrainStop to children following the doc.

And run

iex -S mix phoenix.server
Erlang/OTP 19 [erts-8.0.2] [source-9503fff] [64-bit] [smp:4:4] [async-threads:10] [hipe] [kernel-poll:false]

[info] Running MyPro.Endpoint with Cowboy using http://localhost:4000
Interactive Elixir (1.3.2) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> Application.stop(:my_pro)
[info] stop listening
[info] DrainStop starting graceful shutdown with timeout: 10000
[info] DrainStop Successful, no more connections
:ok
[info] Application star_way_bff exited: :stopped

Perfect! it works! But how can I do it for production? Phoenix server will be started as daemon , how can I stop that application?
Is there a mechanism in Elixir can handle Unix killing signals?
You guys have a better idea?

@PerishableDave
Copy link

@xingxing not sure if you're still looking for a solution. But I found https://github.com/tsutsu/signal_handler which lets Elixir applications intercept POSIX signals.

@nicolasgarnil
Copy link

@aaronjensen are you planning to package this functionality into a library?

@jesseshieh
Copy link

Anybody know if this works with Elixir 1.4.5 and Phoenix 1.3? It doesn't seem to work for me. I put a :timer.sleep/1 in a controller and when I call :init.stop() the connection is severed.

@doughsay
Copy link

doughsay commented Dec 5, 2017

Doesn't seem to work; Elixir 1.5.2, Phoenix 1.3. It still kills open connections that are being blocked by :timer.sleep/1, as reported above. Is this just an artifact of using sleep? Will it work in other scenarios? How can I trust it?

@illiatdesdindes
Copy link

For people trying to use it with Phoenix 1.3, maybe take a look at https://github.com/lyokato/the_end

@oscarolbe
Copy link

This is already supported by plug_cowboy (release 2.1.0)
https://hexdocs.pm/plug_cowboy/Plug.Cowboy.Drainer.html

@aaronjensen
Copy link
Author

That's great! I'll update the file at the top to say that this is now supported as of plug_cowboy 2.1.0. Thanks for pointing it out.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment