Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Stream HTTP body with Elixir Mint
defmodule Downloader do
@moduledoc"""
Download streams of bytes from URLs.
Useful to transfer large files with low RAM usage.
## Example with `ExAWS.S3.upload/3`
```elixir
url
|> Downloader.stream_body!()
|> Downloader.chunk_bytes(5_000_000)
|> ExAws.S3.upload(s3_bucket, filename, opts)
|> ExAws.request!()
```
"""
defstruct completed: false,
content_length: 0,
max_body_length: 200 * 1024 * 1024,
max_redirect: 3,
receive_timeout: 10_000,
redirect_count: 0,
redirect_location: nil,
status: 0
@opts [:max_body_length, :max_redirect, :receive_timeout, :redirect_count]
@doc """
Creates a stream to download files from a URL in chunks.
Raises on 4xx and 5xx responses and when too many redirects.
Also raises connection errors.
Supports `:max_body_length`, `:max_redirect` and `:receive_timeout` options.
Example:
url = "http://www.example.com/video.mp4"
stream = Downloader.stream_body!(url)
body = Enum.map(stream, fn chunk -> chunk end) |> Enum.join()
"""
def stream_body!(url, opts \\ []) do
Stream.resource(
fn -> start_request!(url, opts) end,
&process_request/1,
&finish_request!/1
)
end
defp start_request!(url, opts) do
state = struct(__MODULE__, Keyword.take(opts, @opts))
if state.redirect_count < state.max_redirect do
do_start!(url, state)
else
raise http_error(:too_many_redirects)
end
end
defp do_start!(url, state) do
uri = URI.parse(url)
opts = if uri.scheme == "https", do: [transport_opts: [verify: :verify_none]], else: []
headers = body = []
with {:ok, conn} <- Mint.HTTP.connect(scheme_atom(uri.scheme), uri.host, uri.port, opts) do
request = Mint.HTTP.request(conn, "GET", path(uri), headers, body)
{request, state}
else
{:error, error} ->
raise error
end
end
defp finish_request!({{:error, conn, error}, _state}) do
Mint.HTTP.close(conn)
raise error
end
defp finish_request!({{:ok, conn, _ref}, _state}) do
Mint.HTTP.close(conn)
end
defp process_request({{:error, conn, error}, state}) do
{:halt, {{:error, conn, error}, state}}
end
defp process_request({{:ok, conn, ref}, %{status: status} = state}) do
cond do
status >= 300 and status < 400 and !!state.redirect_location ->
Mint.HTTP.close(conn)
state.redirect_location
|> start_request!(redirect_opts(state))
|> process_request()
status >= 400 and status < 500 ->
{:halt, {{:error, conn, http_error(:status_4xx)}, state}}
status >= 500 ->
{:halt, {{:error, conn, http_error(:status_5xx)}, state}}
state.content_length > state.max_body_length ->
{:halt, {{:error, conn, http_error(:max_body_length_exceeded)}, state}}
state.completed ->
{:halt, {{:ok, conn, ref}, state}}
true ->
receive_chunk(conn, ref, state)
end
end
defp receive_chunk(conn, ref, state) do
receive do
message ->
case Mint.HTTP.stream(conn, message) do
{:ok, conn, data} ->
{chunk, state} = parse_chunk(data, state, ref)
{List.wrap(chunk), {{:ok, conn, ref}, state}}
{:error, conn, error, _messages} ->
{:halt, {{:error, conn, error}, state}}
:unknown ->
{[], {{:ok, conn, ref}, state}}
end
after
state.receive_timeout ->
{:halt, {{:error, conn, http_error(:receive_timeout)}, state}}
end
end
defp parse_chunk(data, state, ref, chunk \\ [])
defp parse_chunk([{:status, ref, status} | messages], state, ref, chunk) do
parse_chunk(messages, %{state | status: status}, ref, chunk)
end
defp parse_chunk([{:headers, ref, headers} | messages], state, ref, chunk) do
content_length = (get_header(headers, "content-length") || "0") |> String.to_integer()
location = get_header(headers, "location")
state = %{state | redirect_location: location, content_length: content_length}
parse_chunk(messages, state, ref, chunk)
end
defp parse_chunk([{:data, ref, data} | messages], state, ref, chunk) do
chunk = [data | chunk]
parse_chunk(messages, state, ref, chunk)
end
defp parse_chunk([{:done, ref}], state, ref, chunk) do
parse_chunk([], %{state | completed: true}, ref, chunk)
end
defp parse_chunk([], state, _ref, chunk) do
if state.status >= 200 and state.status < 300 do
{Enum.reverse(chunk), state}
else
{[], state}
end
end
defp get_header(headers, key) do
for {^key, value} <- headers, do: value
end
defp redirect_opts(%__MODULE__{} = state) do
state
|> Map.take(@opts)
|> Map.put(:redirect_count, state.redirect_count + 1)
|> Map.to_list()
end
defp path(uri) do
IO.iodata_to_binary([
if(uri.path, do: uri.path, else: ["/"]),
if(uri.query, do: ["?" | uri.query], else: []),
if(uri.fragment, do: ["#" | uri.fragment], else: [])
])
end
@doc """
Streams an enumerable in chunks of same byte size.
Useful to stream to servers that limit chunk sizes, like AWS S3.
"""
def chunk_bytes(enum, bytes) do
chunk_fun = fn element, acc ->
acc = acc <> element
if bytes < byte_size(acc) do
<<head::binary-size(bytes), rest::binary>> = acc
{:cont, head, rest}
else
{:cont, acc}
end
end
after_fun = fn acc -> {:cont, acc, ""} end
Stream.chunk_while(enum, "", chunk_fun, after_fun)
end
def http_error(reason), do: %Mint.HTTPError{reason: reason, module: __MODULE__}
def format_error(:status_5xx), do: "response status 5xx"
def format_error(:status_4xx), do: "response status 4xx"
def format_error(:too_many_redirects), do: "request was redirected too many times"
def format_error(:receive_timeout), do: "receive timeout"
def format_error(:max_body_length_exceeded), do: "response body length exceeded the max limit"
defp scheme_atom("https"), do: :https
defp scheme_atom(_), do: :http
end
@alexandremcosta
Copy link
Author

alexandremcosta commented Nov 24, 2021

1: add specs
25: implement rescue version without !
45: support custom tls options
192: return status code and body

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment