Last active
March 19, 2024 15:44
-
-
Save ollien/90eb83d306f1fe1bfef01f553caef7fd to your computer and use it in GitHub Desktop.
HTTPoison Stream adapter; inspired by https://www.poeticoding.com/elixir-streams-to-process-large-http-responses-on-the-fly/
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule HttpStream do | |
use GenServer | |
@doc """ | |
Stream an http response. The given function `make_request` should accept a PID and | |
return a `HTTPoison.AsyncResponse`. The given PID is intended to be passed to `stream_to` | |
in `HTTPoison.request/5`. The optional `message_timeout` argument can be used to configure | |
the timeout on a per-message basis (i.e, it is not a timeout for the full stream, but rather for | |
each chunk received) | |
The returned stream will return the same messages as HTTPoison's `stream_to`. | |
""" | |
@spec response_stream((pid() -> HTTPoison.AsyncResponse.t()), integer()) :: Stream.t() | |
def response_stream(make_request, message_timeout \\ 5000) do | |
{:ok, server} = GenServer.start_link(__MODULE__, nil) | |
Stream.resource( | |
fn -> | |
make_request.(server) | |
end, | |
fn | |
:finished -> | |
{:halt, nil} | |
response -> | |
case get_msg(server, message_timeout) do | |
msg = %HTTPoison.AsyncEnd{} -> | |
{[msg], :finished} | |
msg -> | |
# Can't fail unless this response is orphaned, in which case something has really gone wrong | |
{:ok, next_response} = HTTPoison.stream_next(response) | |
{[msg], next_response} | |
end | |
end, | |
fn _none -> | |
try do | |
GenServer.stop(server) | |
catch | |
# If the server is already stopped, we're ok | |
:exit, {:noproc, {GenServer, :stop, [^server | _rest]}} -> :ok | |
end | |
end | |
) | |
end | |
defp get_msg(server, timeout) do | |
GenServer.call(server, :get_msg, timeout) | |
end | |
@doc """ | |
Not to be called by consumers. This is only exposed as an implementation detail | |
""" | |
def start_link(_nil) do | |
GenServer.start(__MODULE__, nil) | |
end | |
@impl GenServer | |
def init(_nil) do | |
{:ok, %{:pending_recipient => nil, :pending_messages => :queue.new(), :num_pending_messages => 0}} | |
end | |
@impl GenServer | |
def handle_info(msg, state = %{:pending_recipient => nil}) do | |
# Store our message until a recipient becomes available | |
next_state = %{ | |
state | |
| :pending_messages => :queue.in(msg, state.pending_messages), | |
:num_pending_messages => state.num_pending_messages + 1 | |
} | |
{:noreply, next_state} | |
end | |
@impl GenServer | |
def handle_info(msg, state = %{:num_pending_messages => 0}) do | |
# Thread through the item | |
GenServer.reply(state.pending_recipient, msg) | |
# Recipient has gotten their reply, clear them out | |
{:noreply, %{state | :pending_recipient => nil}, {:continue, msg}} | |
end | |
@impl GenServer | |
def handle_info(msg, state) do | |
{reply_msg, state_after_reply} = pending_message_for_reply(state) | |
GenServer.reply(state.pending_recipient, reply_msg) | |
next_state = %{ | |
state_after_reply | |
| :pending_messages => :queue.in(msg, state.pending_messages), | |
:num_pending_messages => state_after_reply.num_pending_messages + 1, | |
# Recipient has gotten their reply, clear them out | |
:pending_recipient => nil | |
} | |
{:noreply, next_state} | |
end | |
@impl GenServer | |
def handle_call(:get_msg, from, state = %{:num_pending_messages => 0}) do | |
next_state = %{state | :pending_recipient => from} | |
{:noreply, next_state} | |
end | |
@impl GenServer | |
def handle_call(:get_msg, _from, state = %{:num_pending_messages => num_pending_messages}) | |
when num_pending_messages > 0 do | |
{reply_msg, next_state} = pending_message_for_reply(state) | |
{:reply, reply_msg, next_state, {:continue, reply_msg}} | |
end | |
@impl GenServer | |
def handle_continue(%HTTPoison.AsyncEnd{}, state) do | |
{:stop, :normal, state} | |
end | |
@impl GenServer | |
def handle_continue(_sent_msg, state) do | |
{:noreply, state} | |
end | |
defp pending_message_for_reply(state = %{:num_pending_messages => num_pending_messages}) | |
when num_pending_messages > 0 do | |
{{:value, next_msg}, pending_messages} = :queue.out(state.pending_messages) | |
next_state = %{ | |
state | |
| :num_pending_messages => num_pending_messages - 1, | |
:pending_messages => pending_messages | |
} | |
{next_msg, next_state} | |
end | |
end | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment