Skip to content

Instantly share code, notes, and snippets.

@myobie
Last active November 19, 2020 14:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save myobie/9d5e1c47ceb6511f57499ab3dc256784 to your computer and use it in GitHub Desktop.
Save myobie/9d5e1c47ceb6511f57499ab3dc256784 to your computer and use it in GitHub Desktop.
Gentle request task in elixir
defmodule Gentle.Application do
use Application
def start(_type, _args) do
children = [
{Task.Supervisor, name: Gentle.TaskSupervisor}
]
opts = [strategy: :one_for_one, name: Gentle.Supervisor]
Supervisor.start_link(children, opts)
end
end
defmodule Gentle.Request do
defstruct [:scheme, :address, :port, :method, :path, :headers, :body, timeout: 1_000]
alias Gentle.{Response, Task}
@type t :: %__MODULE__{
scheme: Mint.Types.scheme(),
address: :inet.ip_address() | :inet.local_address() | String.t(),
port: pos_integer,
method: String.t(),
path: String.t(),
headers: [{String.t(), String.t()}],
body: binary,
timeout: pos_integer
}
end
defmodule Gentle.Response do
defstruct status: 0, headers: [], body: "", errors: []
@type t :: %__MODULE__{
status: pos_integer,
headers: [{String.t(), String.t()}],
body: binary,
errors: [term]
}
end
defmodule Gentle.Task do
alias Gentle.TaskSupervisor, as: Supervisor
alias Gentle.{Request, Response}
@type conn :: Mint.HTTP.t()
@type responses :: [Mint.Types.response()]
@spec sync(Request.t()) :: {:ok, Response.t()} | {:error, term}
def sync(request),
do: async(request) |> await()
@spec async(Request.t()) :: Task.t()
def async(request) do
Task.Supervisor.async_nolink(Supervisor, fn ->
# NOTE: use an async timeout process to kill the request if it takes too
# long and ensure it confirms the timeout
timeout_pid = start_graceful_timeout(self(), request.timeout)
with {:ok, conn} <- Mint.HTTP.connect(request.scheme, request.address, request.port),
{:ok, conn, request_ref} <-
Mint.HTTP.request(
conn,
request.method,
request.path,
request.headers,
request.body
),
{:ok, responses} <- conn |> stream(request_ref, []) |> close_conn() do
send(timeout_pid, :complete)
gather_response(%Response{}, responses)
else
# NOTE: intercept an error result from Mint.HTTP.request and make sure the conn is closed
{:error, conn, error} -> close_conn({:error, conn, error})
other -> other
end
end)
end
@spec await(Task.t()) :: {:ok, Response.t()} | {:error, term}
def await(%Task{ref: ref, owner: owner}) do
if owner != self() do
raise ArgumentError, "Cannot wait on a task started in a different process"
end
receive do
{^ref, result} ->
Process.demonitor(ref, [:flush])
result
{:DOWN, ^ref, _proc, _pid, reason} ->
{:error, reason}
end
end
@spec start_graceful_timeout(pid, pos_integer) :: pid
def start_graceful_timeout(pid, timeout) do
spawn_link(fn ->
receive do
:complete -> nil
after
timeout ->
send(pid, {:timeout, self()})
receive do
:timeout_confirmation -> nil
after
50 -> exit(:timeout_exceeded)
end
end
end)
end
@spec stream(conn, Mint.Types.request_ref(), responses()) ::
{:ok, conn, responses()}
| {:error, conn, term}
defp stream(conn, request_ref, existing_responses) do
with {:ok, conn, responses} <- receive_stream(conn) do
responses = existing_responses ++ responses
case List.last(responses) do
{:done, ^request_ref} ->
{:ok, conn, responses}
{:error, ^request_ref, reason} ->
{:error, conn, [reason: reason, responses: responses]}
_ ->
stream(conn, request_ref, responses)
end
else
{:error, conn, reason, responses} ->
{:error, conn, [reason: reason, responses: existing_responses ++ responses]}
error ->
{:error, conn, error}
end
end
@spec receive_stream(conn) ::
{:ok, conn, responses}
| {:error, conn, Mint.Types.error(), responses}
| :unknown
defp receive_stream(conn) do
# NOTE: We have a special :timeout message to help us possibly end a
# request early
receive do
{:timeout, pid} ->
send(pid, :timeout_confirmation)
{:error, conn, :timeout, []}
message ->
Mint.HTTP.stream(conn, message)
end
end
@spec close_conn({:ok, conn, responses} | {:error, conn, term}) ::
{:ok, responses} | {:error, term}
defp close_conn({:ok, conn, responses}) do
{:ok, _conn} = Mint.HTTP.close(conn)
{:ok, responses}
end
defp close_conn({:error, conn, error}) do
{:ok, _conn} = Mint.HTTP.close(conn)
{:error, error}
end
defp gather_response(response, []) do
case length(response.errors) do
0 -> {:ok, response}
_ -> {:error, response}
end
end
defp gather_response(response, [{:status, _, code} | remaining]) do
Map.put(response, :status, code)
|> gather_response(remaining)
end
defp gather_response(%{headers: existing_headers} = response, [
{:headers, _, headers} | remaining
]) do
Map.put(response, :headers, existing_headers ++ headers)
|> gather_response(remaining)
end
defp gather_response(%{body: body} = response, [{:data, _, data} | remaining]) do
Map.put(response, :body, body <> data)
|> gather_response(remaining)
end
defp gather_response(response, [{:done, _} | _]), do: gather_response(response, [])
defp gather_response(%{errors: errors} = response, [{:error, _, reason} | remaining]) do
Map.put(response, :errors, errors ++ [reason])
|> gather_response(remaining)
end
defp gather_response(%{errors: errors} = response, [{:pong, _} | remaining]) do
Map.put(response, :errors, errors ++ [{:unexepected_response, :pong}])
|> gather_response(remaining)
end
defp gather_response(%{errors: errors} = response, [{:push_promise, _, _, _} | remaining]) do
Map.put(response, :errors, errors ++ [{:unexpected_response, :push_promise}])
|> gather_response(remaining)
end
defp gather_response(%{errors: errors} = response, [unknown | remaining]) do
Map.put(response, :errors, errors ++ [{:unexpected_response, unknown}])
|> gather_response(remaining)
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment