Skip to content

Instantly share code, notes, and snippets.

@smaximov
Last active August 22, 2019 09:36
Show Gist options
  • Save smaximov/5307d6cead522384c0a7b045acdd8ef8 to your computer and use it in GitHub Desktop.
Save smaximov/5307d6cead522384c0a7b045acdd8ef8 to your computer and use it in GitHub Desktop.
HTTP и WebSocket Reverse Proxy

Background: это было написано для "фронт-энд" сервиса, который должен был проксировать HTTP и WebSocket-запросы на несколько внутренних сервисов.

Для того, чтобы переопределить то, как Phoenix обрабатывает вебсокеты, надо поменять настройки HTTP сервера (Cowboy), а именно настройки роутинга (:dispatch) — см. config.exs. Тут мы указываем, что для любого хоста: а) для пути "/aaa/websocket" вызывается хендлер API.Gateway.WSReverseProxy, б) для любого другого пути вызывается дефолтный хэндлер Phoenix.

API.Gateway.WSReverseProxy — это хэндлер Cowboy, для подробной информации о реализуемых колбэках см. поведение :cowboy_websocket. Логика его работы следующая. На каждое вебсокет-соединение создаётся два процесса: процесс самого хэндлера и процесс клиента, который ответственен за проксирование запроса, общаются они между через передачу сообщений. При открытии вебсокет-соединения вызывается колбэк init/2, в котором мы указываем, что хэндлер предназначен для обработки вебсокетов (см. возвращаемое значение). При этом мы откладываем обработку первичного запроса на соединение на момент создания клиента, складывая сам запрос в стейт. Это нужно потому, что на момент выполнения init/2 процесс хэндлера ещё не создан, т.е. мы не знаем его PID, который понадобится для того, чтобы клиент мог возвращать запросы с бэкенд-сервиса браузеру через хэндлер.

Затем вызывается websocket_init/1, который уже используется для инициализации созданного процесса хэндлера, тут мы создаём процесс клиента через API.Gateway.WSReverseProxy.ClientSupervisor. Клиент API.Gateway.WSReverseProxy.Client реализован на основе Websockex. При инициализации клиент определяет параметры соединения с помощью модуля, реализующего API.Gateway.WSReverseProxy.CallbackModule (для примера см. API.Gateway.WSReverseProxy.AAA).

Для проксирования HTTP-запросов используется модуль Plugs.ReverseProxy через forward в роутере.

defmodule API.Gateway.WSReverseProxy.AAA do
@moduledoc """
Callback module to proxy WebSocket connections to the AAA microservice.
"""
use API.Gateway.WSReverseProxy.CallbackModule
alias API.Gateway.Config
@impl API.Gateway.WSReverseProxy.CallbackModule
def websocket_endpoint(_req, _opts) do
Config.aaa_base_url()
|> sub_scheme()
|> URI.merge("/aaa/websocket")
end
@impl API.Gateway.WSReverseProxy.CallbackModule
def conn_options(req, _opts) do
case :cowboy_req.header("cookie", req) do
:undefined ->
[]
cookie ->
[extra_headers: [{"cookie", cookie}]]
end
end
end
defmodule API.Gateway.Application do
@moduledoc false
use Application
alias API.Gateway.Endpoint
@impl Application
def start(_type, _args) do
children = [
API.Gateway.Config,
{Registry, keys: :unique, name: API.Gateway.WSReverseProxy.Registry},
API.Gateway.WSReverseProxy.ClientSupervisor,
Endpoint
]
opts = [strategy: :one_for_one, name: API.Gateway.Supervisor]
Supervisor.start_link(children, opts)
end
@impl Application
def config_change(changed, _new, removed) do
Endpoint.config_change(changed, removed)
:ok
end
end
defmodule API.Gateway.WSReverseProxy.CallbackModule do
@moduledoc """
This behavior describes how to connect to the WebSocket endpoint of a specific
backend service.
"""
alias API.Gateway.WSReverseProxy, as: WebSocketHandler
@type endpoint :: String.t() | URI.t()
@callback websocket_endpoint(initial_req :: WebSocketHandler.req(), opts :: keyword) :: endpoint
@callback conn_options(initial_req :: WebSocketHandler.req(), opts :: keyword) ::
WebSockex.options()
@optional_callbacks conn_options: 2
@doc false
defmacro __using__(_opts) do
quote do
@behaviour API.Gateway.WSReverseProxy.CallbackModule
import API.Gateway.WSReverseProxy.CallbackModule, only: [sub_scheme: 1]
def conn_options(_req, _opts), do: []
defoverridable conn_options: 2
end
end
@spec sub_scheme(endpoint) :: URI.t()
@doc """
Swap `http://` and `https://` with `ws://` and `wss://`, respectively, for
the given URL.
## Examples
```
iex> uri = API.Gateway.WSReverseProxy.CallbackModule.sub_scheme("http://example.com/ws")
iex> uri.scheme
"ws"
iex> uri = API.Gateway.WSReverseProxy.CallbackModule.sub_scheme("https://example.com/ws")
iex> uri.scheme
"wss"
```
iex> uri = API.Gateway.WSReverseProxy.CallbackModule.sub_scheme(%URI{scheme: "http", host: "example.com", path: "ws"})
iex> uri.scheme
"ws"
iex> uri = API.Gateway.WSReverseProxy.CallbackModule.sub_scheme(%URI{scheme: "https", host: "example.com", path: "ws"})
iex> uri.scheme
"wss"
"""
def sub_scheme(endpoint)
def sub_scheme(url) when is_binary(url), do: sub_scheme(URI.parse(url))
def sub_scheme(%URI{scheme: "http"} = uri), do: %URI{uri | scheme: "ws"}
def sub_scheme(%URI{scheme: "https"} = uri), do: %URI{uri | scheme: "wss"}
end
defmodule API.Gateway.WSReverseProxy.Client do
@moduledoc """
Websocket client.
"""
use WebSockex, restart: :temporary
alias API.Gateway.WSReverseProxy, as: WebSocketHandler
alias API.Gateway.WSReverseProxy.Registry, as: WSRegistry
require Logger
defmodule State do
@moduledoc false
@enforce_keys [:ref]
defstruct [:ref]
@type t :: %__MODULE__{ref: reference}
end
@opaque state :: State.t()
@type opts :: {WebSocketHandler.state(), pid}
@type on_start :: {:ok, pid} | {:error, term}
@type call_result(state) ::
{:ok, state}
| {:reply, WebSockex.frame(), state}
| {:close, state}
| {:close, WebSockex.close_frame(), state}
@spec start_link(opts) :: on_start
def start_link(opts)
def start_link({state, handler}) do
%WebSocketHandler.State{
callback_module: callback_module,
ref: ref,
opts: opts,
initial_req: initial_req
} = state
name = {:via, Registry, {WSRegistry, ref, handler}}
url = callback_module.websocket_endpoint(initial_req, opts)
conn_opts = callback_module.conn_options(initial_req, opts)
state = %State{ref: ref}
client_opts = Keyword.merge(conn_opts, name: name)
WebSockex.start_link(url, __MODULE__, state, client_opts)
end
@impl WebSockex
@spec handle_frame(WebSockex.frame(), state) :: call_result(state)
def handle_frame(frame, state)
def handle_frame({:text, msg}, state) do
Logger.debug("websocket client #{inspect(state.ref)} received text frame: #{msg}")
case Registry.lookup(WSRegistry, state.ref) do
[{_, handler_pid}] ->
send(handler_pid, {:proxy, {:text, msg}})
end
{:ok, state}
end
end
defmodule API.Gateway.WSReverseProxy.ClientSupervisor do
@moduledoc """
Dynamic supervisor to handle clients for proxied websocket connections.
"""
use DynamicSupervisor
alias API.Gateway.WSReverseProxy, as: WebSocketHandler
alias API.Gateway.WSReverseProxy.Client
def start_link(arg) do
DynamicSupervisor.start_link(__MODULE__, arg, name: __MODULE__)
end
@impl DynamicSupervisor
def init(_arg) do
DynamicSupervisor.init(strategy: :one_for_one)
end
@spec start_child(WebSocketHandler.state(), pid) :: DynamicSupervisor.on_start_child()
def start_child(state, handler) do
init_arg = {state, handler}
DynamicSupervisor.start_child(__MODULE__, {Client, init_arg})
end
def terminate_child(pid) do
DynamicSupervisor.terminate_child(__MODULE__, pid)
end
end
dispatch = [
_: [
{
"/aaa/websocket",
API.Gateway.WSReverseProxy,
callback_module: API.Gateway.WSReverseProxy.AAA
},
{:_, Phoenix.Endpoint.Cowboy2Handler, {API.Gateway.Endpoint, []}}
]
]
# Configures the endpoint
config :api_gateway, API.Gateway.Endpoint,
url: [host: "localhost"],
secret_key_base:
"1f4d55e7ed0b23de2d25eb6f64e057b1837b40b66f40e8abee601956e479f2028f70689389dedb6914184ccc2e3bc402ed950c4f02794d4bfdda3c4c42f62142",
render_errors: [view: API.Gateway.ErrorView, accepts: ~w(json)],
http: [dispatch: dispatch],
pubsub: [name: API.Gateway.PubSub, adapter: Phoenix.PubSub.PG2]
defmodule Plugs.ReverseProxy do
@moduledoc """
Proxy external requests to backend services.
## Example
```
forward "/api", Plugs.ReverseProxy, base_url: {AppConfig, :get_base_url}
```
"""
@behaviour Plug
import Plug.Conn
alias Plug.Conn
require Logger
defmodule Opts do
@moduledoc false
@enforce_keys [:base_url]
defstruct [:base_url]
@type t :: %__MODULE__{
base_url: String.t() | {module(), atom()}
}
@spec base_url(t()) :: String.t()
def base_url(%__MODULE__{base_url: base_url})
when is_binary(base_url) do
base_url
end
def base_url(%__MODULE__{base_url: {mod, fun}})
when is_atom(mod) and is_atom(fun) do
apply(mod, fun, [])
end
end
@typep request :: {
Tesla.Env.method(),
Tesla.Env.url(),
Tesla.Env.headers(),
Tesla.Env.body()
}
@typep read_body_result :: {:ok, binary(), Conn.t()} | {:more, binary(), Conn.t()}
@impl Plug
@spec init(Plug.opts()) :: Opts.t()
def init(opts) when is_list(opts) do
{:ok, base_url} = Keyword.fetch(opts, :base_url)
%Opts{base_url: base_url}
end
@impl Plug
@spec call(Conn.t(), Opts.t()) :: Conn.t()
def call(conn, opts) do
conn
|> prepare_request(opts)
|> perform_request()
|> handle_response(conn)
end
@spec prepare_request(Conn.t(), Opts.t()) :: request()
defp prepare_request(conn, opts) do
%Conn{
method: method,
request_path: path,
query_string: query
} = conn
method = tesla_method(method)
url =
opts
|> Opts.base_url()
|> build_proxy_url(path, query)
headers = prepare_request_headers(conn)
body =
conn
|> read_body()
|> maybe_stream_body()
{method, url, headers, body}
end
@spec tesla_method(Conn.method()) :: Tesla.Env.method()
for tesla_method <- ~w[head get delete trace options post put patch]a do
plug_method = tesla_method |> Atom.to_string() |> String.upcase()
defp tesla_method(unquote(plug_method)), do: unquote(tesla_method)
end
@spec build_proxy_url(String.t(), String.t(), String.t()) :: String.t()
defp build_proxy_url(base_url, path, query) do
query = normalize_query(query)
base_url
|> URI.merge(%URI{path: path, query: query})
|> URI.to_string()
end
@spec normalize_query(String.t()) :: String.t() | nil
defp normalize_query(""), do: nil
defp normalize_query(query), do: query
@spec prepare_request_headers(Conn.t()) :: Conn.headers()
defp prepare_request_headers(conn) do
conn
|> delete_req_header("transfer-encoding")
|> delete_req_header("host")
|> Map.get(:req_headers)
end
@spec maybe_stream_body(read_body_result()) :: Tesla.Env.body()
defp maybe_stream_body({:ok, body, _conn}), do: body
defp maybe_stream_body({:more, body, conn}) do
Stream.resource(
fn -> {body, conn} end,
fn
# Initial chunk
{body, conn} ->
{[body], conn}
# After the last chunk is read
nil ->
{:halt, nil}
# Process the next chunk
conn ->
case read_body(conn) do
{:ok, body, _conn} ->
# Signal the last chunk is read
{[body], nil}
{:more, body, conn} ->
{[body], conn}
end
end,
fn _acc -> nil end
)
end
@spec perform_request(request()) :: Tesla.Env.result()
defp perform_request({method, url, headers, body}) do
Logger.debug(fn ->
payload =
inspect(method: method, url: url, header: headers, body: inspect(body, limit: 256))
"reverse proxy request: #{payload}"
end)
Tesla.request(method: method, url: url, body: body, headers: headers)
end
@spec handle_response(Tesla.Env.result(), Conn.t()) :: Conn.t()
defp handle_response({:ok, %Tesla.Env{status: status, headers: headers, body: body}}, conn) do
headers
|> Enum.reduce(
conn,
fn {header, value}, conn ->
put_resp_header(conn, header, value)
end
)
|> delete_resp_header("transfer-encoding")
|> send_resp(status, body)
end
defp handle_response({:error, error}, conn) do
report_error(error)
conn
|> put_resp_header("content-type", "text/plain")
|> send_resp(503, "")
end
defp report_error(error) do
error =
error
|> :file.format_error()
|> case do
'unknown POSIX error' -> inspect(error)
error -> to_string(error)
end
Bugsnag.report(error, severity: "error", context: "reverse proxy")
Logger.error("reverse proxy request failure: #{error}")
end
@proxy_module (quote do
@moduledoc false
@behaviour Plug
@impl Plug
defdelegate init(opts), to: Plugs.ReverseProxy
@impl Plug
defdelegate call(plug, opts), to: Plugs.ReverseProxy
end)
# HACK(smaximov):
# we cannot forward to a single plug multiple times inside a router; this is
# a Phoenix router limitation https://github.com/phoenixframework/phoenix/pull/1419
@doc false
def define_proxy_module(module) when is_atom(module),
do: Module.create(module, @proxy_module, __ENV__)
end
Plugs.ReverseProxy.define_proxy_module(Plugs.ReverseProxy.AAA)
Plugs.ReverseProxy.define_proxy_module(Plugs.ReverseProxy.Payments)
Plugs.ReverseProxy.define_proxy_module(Plugs.ReverseProxy.Reviews)
defmodule API.Gateway.WSReverseProxy do
@moduledoc """
Raw websocket handler to proxy websocket requests to backend services.
"""
@behaviour :cowboy_websocket
alias API.Gateway.WSReverseProxy.ClientSupervisor
alias API.Gateway.WSReverseProxy.Registry, as: WSRegistry
require Logger
defmodule State do
@moduledoc false
@enforce_keys [:ref]
defstruct [:ref, :callback_module, :opts, initial_req: nil]
@type t :: %__MODULE__{
ref: reference,
callback_module: module | nil,
opts: keyword | nil,
initial_req: :cowboy_req.req() | nil
}
end
@opaque state :: State.t()
@type req :: :cowboy_req.req()
@type opts :: keyword
@type on_init :: {:cowboy_websocket, req(), state}
@type frame ::
:ping
| :pong
| {:text | :binary | :ping | :pong, binary()}
@type call_result :: :cowboy_websocket.call_result(state)
@type terminate_reason :: :cowboy_websocket.terminate_reason()
@impl :cowboy_websocket
@spec init(req, opts) :: on_init
def init(req, opts) do
ref = make_ref()
Logger.debug("new websocket request #{inspect(ref)}: #{inspect(req)}")
{callback_module, opts} = Keyword.pop(opts, :callback_module)
state = %State{ref: ref, callback_module: callback_module, opts: opts, initial_req: req}
{:cowboy_websocket, req, state}
end
@impl :cowboy_websocket
@spec websocket_init(state) :: call_result
def websocket_init(state) do
{:ok, _} = ClientSupervisor.start_child(state, self())
new_state = %State{ref: state.ref}
{:ok, new_state}
end
@impl :cowboy_websocket
@spec websocket_handle(frame, state) :: call_result
def websocket_handle(frame, state)
def websocket_handle({:text, msg}, state) do
Logger.debug("message from websocket #{inspect(state.ref)}: #{msg}")
case Registry.lookup(WSRegistry, state.ref) do
[{client_pid, _}] ->
:ok = WebSockex.send_frame(client_pid, {:text, msg})
end
{:ok, state}
end
def websocket_handle(frame, state) do
Logger.debug("frame from websocket #{inspect(state.ref)}: #{inspect(frame)}")
{:ok, state}
end
@impl :cowboy_websocket
@spec websocket_info(term, state) :: call_result
def websocket_info(info, state)
def websocket_info({:proxy, {:text, msg}}, state) do
Logger.debug("message for websocket #{inspect(state.ref)}: #{msg}")
{:reply, {:text, msg}, state}
end
def websocket_info(info, state) do
Logger.debug("Erlang message for websocket #{inspect(state.ref)}: #{inspect(info)}")
{:ok, state}
end
@impl :cowboy_websocket
@spec terminate(terminate_reason, map, state) :: :ok
def terminate(reason, partial_req, state) do
Logger.debug("""
websocket #{inspect(state.ref)} terminated: #{inspect(partial_req)}
reason: #{inspect(reason)}
""")
with [{client_pid, _}] <- Registry.lookup(WSRegistry, state.ref) do
ClientSupervisor.terminate_child(client_pid)
end
:ok
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment