Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Phoenix Drain Stop
# ATTENTION: This is now supported in plug_cowboy as of 2.1.0:
# https://hexdocs.pm/plug_cowboy/Plug.Cowboy.Drainer.html
defmodule DrainStop do
@moduledoc """
DrainStop Attempts to gracefully shutdown an endpoint when a normal shutdown
occurs. It first shuts down the acceptor, ensuring that no new requests can be
made. It then waits for all pending requests to complete. If the timeout
expires before this happens, it stops waiting, allowing the supervision tree
to continue its shutdown order.
DrainStop should be installed in your supervision tree *after* the
EndPoint it is going to drain stop.
DrainStop takes two options:
`endpoint`: The `Phoenix.Endpoint` to drain stop. Required. `timeout`: The
amount of time to allow for requests to finish in msec. Defaults to `5000`.
For example:
children = [
supervisor(MyApp.Endpoint, []),
worker(
DrainStop,
[[timeout: 10_000, endpoint: MyApp.Endpoint]],
[shutdown: 15_000]
)
]
"""
use GenServer
require Logger
import Supervisor, only: [which_children: 1, terminate_child: 2]
def start_link(options) do
GenServer.start_link(DrainStop, options)
end
def init(options) do
Process.flag(:trap_exit, true)
endpoint = Keyword.fetch!(options, :endpoint)
timeout = Keyword.get(options, :timeout, 5000)
{:ok, {endpoint, timeout}}
end
def terminate(:shutdown, {endpoint, timeout}), do: drain_endpoint(endpoint, timeout)
def terminate({:shutdown, _}, {endpoint, timeout}), do: drain_endpoint(endpoint, timeout)
def terminate(:normal, {endpoint, timeout}), do: drain_endpoint(endpoint, timeout)
def terminate(_, _), do: :ok
def drain_endpoint(endpoint, timeout) do
stop_listening(endpoint)
wait_for_requests(endpoint, timeout)
end
def wait_for_requests(endpoint, timeout) do
Logger.info("DrainStop starting graceful shutdown with timeout: #{timeout}")
timer_ref = :erlang.start_timer(timeout, self, :timeout)
do_wait_for_requests(endpoint, timer_ref, %{})
end
defp do_wait_for_requests(endpoint, timer_ref, refs) do
get_monitor = fn pid ->
refs[pid] || Process.monitor(pid)
end
refs =
endpoint
|> pending_requests
|> Map.new(&{&1, get_monitor.(&1)})
case Map.size(refs) do
0 ->
Logger.info("DrainStop Successful, no more connections")
:erlang.cancel_timer(timer_ref)
n ->
time_left = :erlang.read_timer(timer_ref)
Logger.info("DrainStop waiting #{time_left} msec for #{n} more connections to shutdown")
receive do
{:DOWN, _monitor_ref, _, _, _} ->
do_wait_for_requests(endpoint, timer_ref, refs)
{:timeout, ^timer_ref, :timeout} ->
Logger.error("DrainStop timeout")
msg ->
Logger.error("DrainStop unexpected msg: #{inspect msg}")
do_wait_for_requests(endpoint, timer_ref, refs)
end
end
end
def pending_requests(endpoint) do
for pid <- ranch_listener_sup_pids(endpoint),
{:ranch_conns_sup, sup_pid, _, _} <- which_children(pid),
{_, request_pid, _, _} <- which_children(sup_pid),
do: request_pid
end
def ranch_listener_sup_pids(endpoint) do
for {Phoenix.Endpoint.Server, pid, _, _} <- which_children(endpoint),
{{:ranch_listener_sup, _}, pid, _, _} <- which_children(pid),
do: pid
end
def stop_listening(endpoint) do
endpoint
|> ranch_listener_sup_pids
|> Enum.each(&terminate_child(&1, :ranch_acceptors_sup))
end
end
defmodule DrainStopTest do
use ExUnit.Case, async: false
use Phoenix.ConnTest
defmodule TestSupervisor do
@drain_stop_timeout 100
def start_link do
import Supervisor.Spec, warn: false
children = [
# Start the endpoint when the application starts
supervisor(DrainStopTest.TestEndpoint, []),
worker(
DrainStop,
[[timeout: @drain_stop_timeout, endpoint: DrainStopTest.TestEndpoint]],
[shutdown: @drain_stop_timeout * 2]
),
]
opts = [strategy: :one_for_one]
Supervisor.start_link(children, opts)
end
end
defmodule TestPlug do
def init(opts), do: opts
def call(conn, _) do
pid = Application.get_env(:drain_stop_test, :test_pid)
conn = Plug.Conn.fetch_query_params(conn)
send pid, :request_start
{time, _} = Integer.parse(conn.params["sleep"])
:timer.sleep(time)
send pid, :request_end
conn
end
end
defmodule TestEndpoint do
use Phoenix.Endpoint, otp_app: :drain_stop_test
plug DrainStopTest.TestPlug
end
@endpoint DrainStopTest.TestEndpoint
setup do
Application.put_env(:drain_stop_test, :test_pid, self)
Application.put_env(:drain_stop_test, DrainStopTest.TestEndpoint,
http: [port: "4807"], url: [host: "example.com"], server: true)
{:ok, pid} = DrainStopTest.TestSupervisor.start_link
Process.flag(:trap_exit, true)
on_exit(fn ->
if Process.whereis(DrainStopTest.TestEndpoint) do
Supervisor.stop(DrainStopTest.TestEndpoint)
end
Process.exit(pid, :brutal_kill)
end)
{:ok, pid: pid}
end
test "waits for request to finish", %{pid: pid} do
Task.async(fn ->
HTTPoison.get("http://localhost:4807/?sleep=50")
end)
assert_receive :request_start, 1000
Supervisor.stop(pid, :shutdown)
assert_received :request_end
end
test "truncates requests that don't finish in time", %{pid: pid} do
Task.async(fn ->
HTTPoison.get("http://localhost:4807/?sleep=500")
end)
assert_receive :request_start, 1000
Supervisor.stop(pid, :shutdown)
refute_received :request_end
end
test "does not allow new requests" do
# This is harder to test without reaching in to internals...
DrainStop.stop_listening(DrainStopTest.TestEndpoint)
assert_raise(HTTPoison.Error,
fn -> HTTPoison.get!("http://localhost:4807/?sleep=500") end)
end
end
@drewblas

This comment has been minimized.

Copy link

@drewblas drewblas commented May 27, 2016

Thanks for this. I think it'll be a big help for us!

@mmmries

This comment has been minimized.

Copy link

@mmmries mmmries commented Jun 25, 2016

Thanks for posting this 👍 very helpful for a use case that I have.

@xingxing

This comment has been minimized.

Copy link

@xingxing xingxing commented Sep 18, 2016

Thanks a lot for the job! But how can I use it. I added DrainStop to children following the doc.

And run

iex -S mix phoenix.server
Erlang/OTP 19 [erts-8.0.2] [source-9503fff] [64-bit] [smp:4:4] [async-threads:10] [hipe] [kernel-poll:false]

[info] Running MyPro.Endpoint with Cowboy using http://localhost:4000
Interactive Elixir (1.3.2) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> Application.stop(:my_pro)
[info] stop listening
[info] DrainStop starting graceful shutdown with timeout: 10000
[info] DrainStop Successful, no more connections
:ok
[info] Application star_way_bff exited: :stopped

Perfect! it works! But how can I do it for production? Phoenix server will be started as daemon , how can I stop that application?
Is there a mechanism in Elixir can handle Unix killing signals?
You guys have a better idea?

@PerishableDave

This comment has been minimized.

Copy link

@PerishableDave PerishableDave commented Jul 20, 2017

@xingxing not sure if you're still looking for a solution. But I found https://github.com/tsutsu/signal_handler which lets Elixir applications intercept POSIX signals.

@nicolasgarnil

This comment has been minimized.

Copy link

@nicolasgarnil nicolasgarnil commented Aug 2, 2017

@aaronjensen are you planning to package this functionality into a library?

@jesseshieh

This comment has been minimized.

Copy link

@jesseshieh jesseshieh commented Sep 29, 2017

Anybody know if this works with Elixir 1.4.5 and Phoenix 1.3? It doesn't seem to work for me. I put a :timer.sleep/1 in a controller and when I call :init.stop() the connection is severed.

@doughsay

This comment has been minimized.

Copy link

@doughsay doughsay commented Dec 5, 2017

Doesn't seem to work; Elixir 1.5.2, Phoenix 1.3. It still kills open connections that are being blocked by :timer.sleep/1, as reported above. Is this just an artifact of using sleep? Will it work in other scenarios? How can I trust it?

@illiatdesdindes

This comment has been minimized.

Copy link

@illiatdesdindes illiatdesdindes commented Jan 22, 2018

For people trying to use it with Phoenix 1.3, maybe take a look at https://github.com/lyokato/the_end

@oscarolbe

This comment has been minimized.

Copy link

@oscarolbe oscarolbe commented May 20, 2020

This is already supported by plug_cowboy (release 2.1.0)
https://hexdocs.pm/plug_cowboy/Plug.Cowboy.Drainer.html

@aaronjensen

This comment has been minimized.

Copy link
Owner Author

@aaronjensen aaronjensen commented May 20, 2020

That's great! I'll update the file at the top to say that this is now supported as of plug_cowboy 2.1.0. Thanks for pointing it out.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment