Skip to content

Instantly share code, notes, and snippets.

@ollien
Last active March 19, 2024 15:44
Show Gist options
  • Save ollien/90eb83d306f1fe1bfef01f553caef7fd to your computer and use it in GitHub Desktop.
Save ollien/90eb83d306f1fe1bfef01f553caef7fd to your computer and use it in GitHub Desktop.
defmodule HttpStream do
use GenServer
@doc """
Stream an http response. The given function `make_request` should accept a PID and
return a `HTTPoison.AsyncResponse`. The given PID is intended to be passed to `stream_to`
in `HTTPoison.request/5`. The optional `message_timeout` argument can be used to configure
the timeout on a per-message basis (i.e, it is not a timeout for the full stream, but rather for
each chunk received)
The returned stream will return the same messages as HTTPoison's `stream_to`.
"""
@spec response_stream((pid() -> HTTPoison.AsyncResponse.t()), integer()) :: Stream.t()
def response_stream(make_request, message_timeout \\ 5000) do
{:ok, server} = GenServer.start_link(__MODULE__, nil)
Stream.resource(
fn ->
make_request.(server)
end,
fn
:finished ->
{:halt, nil}
response ->
case get_msg(server, message_timeout) do
msg = %HTTPoison.AsyncEnd{} ->
{[msg], :finished}
msg ->
# Can't fail unless this response is orphaned, in which case something has really gone wrong
{:ok, next_response} = HTTPoison.stream_next(response)
{[msg], next_response}
end
end,
fn _none ->
try do
GenServer.stop(server)
catch
# If the server is already stopped, we're ok
:exit, {:noproc, {GenServer, :stop, [^server | _rest]}} -> :ok
end
end
)
end
defp get_msg(server, timeout) do
GenServer.call(server, :get_msg, timeout)
end
@doc """
Not to be called by consumers. This is only exposed as an implementation detail
"""
def start_link(_nil) do
GenServer.start(__MODULE__, nil)
end
@impl GenServer
def init(_nil) do
{:ok, %{:pending_recipient => nil, :pending_messages => :queue.new(), :num_pending_messages => 0}}
end
@impl GenServer
def handle_info(msg, state = %{:pending_recipient => nil}) do
# Store our message until a recipient becomes available
next_state = %{
state
| :pending_messages => :queue.in(msg, state.pending_messages),
:num_pending_messages => state.num_pending_messages + 1
}
{:noreply, next_state}
end
@impl GenServer
def handle_info(msg, state = %{:num_pending_messages => 0}) do
# Thread through the item
GenServer.reply(state.pending_recipient, msg)
# Recipient has gotten their reply, clear them out
{:noreply, %{state | :pending_recipient => nil}, {:continue, msg}}
end
@impl GenServer
def handle_info(msg, state) do
{reply_msg, state_after_reply} = pending_message_for_reply(state)
GenServer.reply(state.pending_recipient, reply_msg)
next_state = %{
state_after_reply
| :pending_messages => :queue.in(msg, state.pending_messages),
:num_pending_messages => state_after_reply.num_pending_messages + 1,
# Recipient has gotten their reply, clear them out
:pending_recipient => nil
}
{:noreply, next_state}
end
@impl GenServer
def handle_call(:get_msg, from, state = %{:num_pending_messages => 0}) do
next_state = %{state | :pending_recipient => from}
{:noreply, next_state}
end
@impl GenServer
def handle_call(:get_msg, _from, state = %{:num_pending_messages => num_pending_messages})
when num_pending_messages > 0 do
{reply_msg, next_state} = pending_message_for_reply(state)
{:reply, reply_msg, next_state, {:continue, reply_msg}}
end
@impl GenServer
def handle_continue(%HTTPoison.AsyncEnd{}, state) do
{:stop, :normal, state}
end
@impl GenServer
def handle_continue(_sent_msg, state) do
{:noreply, state}
end
defp pending_message_for_reply(state = %{:num_pending_messages => num_pending_messages})
when num_pending_messages > 0 do
{{:value, next_msg}, pending_messages} = :queue.out(state.pending_messages)
next_state = %{
state
| :num_pending_messages => num_pending_messages - 1,
:pending_messages => pending_messages
}
{next_msg, next_state}
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment