Skip to content

Instantly share code, notes, and snippets.

@hubertlepicki
Last active February 21, 2023 11:24
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hubertlepicki/7be1d5c1f396c7508b153a4a39a542ef to your computer and use it in GitHub Desktop.
Save hubertlepicki/7be1d5c1f396c7508b153a4a39a542ef to your computer and use it in GitHub Desktop.
defmodule Core.HttpStreamer do
@moduledoc """
Functions in this module can be used to stream and consume remote files, over HTTP/HTTPS, either line
by line with a callback function (using `stream_url/2` function) or as an Elixir Stream (with `stream_url/1`),
or to fetch just the first line of the remote file `peek_at_url/0`.
This is useful for example for processing large CSV files and figuring out their headers.
"""
def stream_url("https://" <> _rest = url, consume_line_fun),
do: do_stream_url(url, consume_line_fun)
def stream_url("http://" <> _rest = url, consume_line_fun),
do: do_stream_url(url, consume_line_fun)
# this is for dev only, maybe needs cleanup
def stream_url(url, consume_line_fun),
do: stream_url("http://localhost:4000" <> url, consume_line_fun)
def stream_url(url) do
init_fun = fn ->
{:ok, pid} = __MODULE__.ResourceServer.start_link(url)
pid
end
next_fun = fn pid ->
case GenServer.call(pid, :get_line, :infinity) do
{:ok, line} ->
{[line], pid}
{:error, :done} ->
{:halt, pid}
end
end
after_fun = fn pid ->
GenServer.stop(pid)
end
stream = Stream.resource(init_fun, next_fun, after_fun)
end
def peek_at_url("https://" <> _rest = url), do: do_peek_at_url(url)
def peek_at_url("http://" <> _rest = url), do: do_peek_at_url(url)
def peek_at_url(url), do: peek_at_url("http://localhost:4000" <> url)
defp do_peek_at_url(url) do
fun = fn tuple, line_so_far ->
case tuple do
{:data, data} ->
data
|> String.split("\n")
|> consume_first_line(line_so_far)
_ ->
line_so_far
end
end
try do
Finch.build(:get, url) |> Finch.stream(Core.HttpClient, "", fun)
rescue
e in __MODULE__.StreamingInterrupted ->
{:ok, e.line}
end
end
defp do_stream_url(url, consume_line_fun) do
fun = fn tuple, line_so_far ->
case tuple do
{:data, data} ->
data
|> String.split("\n")
|> consume_all_but_last_line(line_so_far, consume_line_fun)
_ ->
line_so_far
end
end
Finch.build(:get, url) |> Finch.stream(Core.HttpClient, "", fun)
end
defp consume_all_but_last_line([last_line], line_so_far, consume_line_fun), do: last_line
defp consume_all_but_last_line([first_line | rest], line_so_far, consume_line_fun) do
line = line_so_far <> first_line
consume_line_fun.(line)
consume_all_but_last_line(rest, "", consume_line_fun)
end
defp consume_first_line(lines, line_so_far) do
line = line_so_far <> hd(lines)
if length(lines) > 0 do
raise __MODULE__.StreamingInterrupted, line: line_so_far <> hd(lines)
end
line
end
defmodule StreamingInterrupted do
defexception message: "streaming interrupted", line: ""
end
end
defmodule Core.HttpStreamer.ResourceServer do
use GenServer
def start_link(url) do
GenServer.start_link(__MODULE__, url)
end
def init(url) do
{:ok, %{url: url, lines: [], done: false, transport: nil, caller: nil}, {:continue, :init}}
end
def handle_continue(:init, %{url: url} = state) do
{:ok, _pid} = Core.HttpStreamer.TransportServer.start_link(url)
{:noreply, state}
end
def handle_call(:get_line, _from, %{lines: lines} = state) when length(lines) > 0 do
[line | rest] = lines
{:reply, {:ok, line}, %{state | lines: rest, caller: nil}}
end
def handle_call(:get_line, _from, %{lines: [], done: true} = state) do
{:reply, {:error, :done}, %{state | caller: nil}}
end
# postpone returning line until at least one is in the buffer
def handle_call(:get_line, from, %{lines: [], done: false} = state) do
if state.transport do
GenServer.reply(state.transport, :ok)
end
{:noreply, %{state | caller: from}}
end
# no process is waiting for a reply, just fill in the buffer and wait
def handle_call({:put_lines, lines}, from, %{caller: nil} = state) do
{:noreply, %{state | lines: state.lines ++ lines, transport: from}}
end
# there is a process waiting for a reply, do it and update buffer
def handle_call({:put_lines, lines}, from, %{caller: caller} = state) do
[line | rest] = state.lines ++ lines
GenServer.reply(caller, {:ok, line})
{:noreply, %{state | lines: rest, transport: from}}
end
# transport finished it's work and there is no caller waiting for another line yet
def handle_call(:done, _from, %{caller: nil} = state) do
{:reply, :ok, %{state | done: true}}
end
# transport finished it's work, but there is a caller waiting for another line
def handle_call(:done, _from, %{caller: caller} = state) do
GenServer.reply(caller, {:error, :done})
{:reply, :ok, %{state | done: true}}
end
end
defmodule Core.HttpStreamer.TransportServer do
def start_link(url) do
resource_server_pid = self()
pid =
spawn_link(fn ->
consume_line_fun = fn line ->
:ok = GenServer.call(resource_server_pid, {:put_lines, [line]}, :infinity)
end
Core.HttpStreamer.stream_url(url, consume_line_fun)
:ok = GenServer.call(resource_server_pid, :done, :infinity)
end)
{:ok, pid}
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment