Skip to content

Instantly share code, notes, and snippets.

@zachallaun
Last active April 23, 2023 20:48
Show Gist options
  • Save zachallaun/88aed2a0cef0aed6d68dcc7c12531649 to your computer and use it in GitHub Desktop.
Save zachallaun/88aed2a0cef0aed6d68dcc7c12531649 to your computer and use it in GitHub Desktop.
Livebook notebook demonstrating how streaming responses might be implemented

Req Stream

local_req = Path.expand("~/dev/req")

Mix.install([
  {:req, path: local_req}
])
:ok

Finch stream

This module is a simple proof-of-concept using Finch.stream/5 and Stream.resource/3 to expose the response body as a stream. It doesn't currently handle request cancelation.

defmodule FinchStream do
  @moduledoc """
  Uses Finch's lower-level `stream` API to expose an HTTP
  response as a Stream.

  ## Usage

      Finch.start_link(name: MyFinch)

      {status, headers, stream} =
        Finch.build(:get, "https://example.com")
        |> FinchStream.stream!(MyFinch)

  """

  def stream!(request, finch, opts \\ []) do
    IO.puts("starting stream")
    me = self()
    ref = make_ref()

    task =
      Task.async(fn ->
        on_chunk = fn chunk, _acc -> send(me, {:chunk, chunk, ref}) end
        Finch.stream(request, finch, nil, on_chunk, opts)
        send(me, {:done, ref})
      end)

    status = receive(do: ({:chunk, {:status, status}, ^ref} -> status))
    headers = receive(do: ({:chunk, {:headers, headers}, ^ref} -> headers))

    stream =
      Stream.resource(
        fn -> {ref, task} end,
        &next_fun/1,
        &after_fun/1
      )

    {status, headers, stream}
  end

  defp next_fun({ref, task}) do
    IO.puts("awaiting data")

    receive do
      {:chunk, {:data, data}, ^ref} -> {[data], {ref, task}}
      {:done, ^ref} -> {:halt, {ref, task}}
    end
  end

  defp after_fun({_ref, task}) do
    IO.puts("done")
    Task.shutdown(task)
  end
end
{:module, FinchStream, <<70, 79, 82, 49, 0, 0, 14, ...>>, {:after_fun, 1}}

Req plugin

defmodule ReqStream do
  def attach(req) do
    req
    |> Req.Request.register_options([:stream])
    |> Req.Request.append_request_steps(stream: &stream_request/1)
  end

  defp stream_request(request) do
    if request.options[:stream] do
      Req.Request.merge_options(request, finch_exec_request: &exec_stream_request/3)
    else
      request
    end
  end

  defp exec_stream_request(finch_request, finch_name, finch_opts) do
    {status, headers, stream} = FinchStream.stream!(finch_request, finch_name, finch_opts)

    %Req.Response{
      status: status,
      headers: headers,
      body: {:stream, stream}
    }
  end
end
{:module, ReqStream, <<70, 79, 82, 49, 0, 0, 11, ...>>, {:exec_stream_request, 3}}
response =
  Req.new(url: "https://sse.dev/test")
  |> ReqStream.attach()
  |> Req.get!(stream: true)
starting stream
%Req.Response{
  status: 200,
  headers: [
    {"date", "Sun, 23 Apr 2023 20:47:51 GMT"},
    {"server", "Apache/2.4.52 (Unix) OpenSSL/1.1.1n"},
    {"access-control-allow-origin", "*"},
    {"cache-control", "no-cache"},
    {"content-type", "text/event-stream"},
    {"transfer-encoding", "chunked"}
  ],
  body: {:stream, #Function<52.57817549/2 in Stream.resource/3>},
  private: %{}
}
{:stream, stream} = response.body

for data <- stream do
  IO.inspect(data)
end
awaiting data
"data: {\"testing\":true,\"sse_dev\":\"is great\",\"msg\":\"It works!\",\"now\":1682282871288}\n\n"
awaiting data
"data: {\"testing\":true,\"sse_dev\":\"is great\",\"msg\":\"It works!\",\"now\":1682282873288}\n\n"
awaiting data
"data: {\"testing\":true,\"sse_dev\":\"is great\",\"msg\":\"It works!\",\"now\":1682282875288}\n\n"
awaiting data
"data: {\"testing\":true,\"sse_dev\":\"is great\",\"msg\":\"It works!\",\"now\":1682282877288}\n\n"
awaiting data
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment