-
-
Save hubertlepicki/7be1d5c1f396c7508b153a4a39a542ef to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Core.HttpStreamer do | |
@moduledoc """ | |
Functions in this module can be used to stream and consume remote files, over HTTP/HTTPS, either line | |
by line with a callback function (using `stream_url/2` function) or as an Elixir Stream (with `stream_url/1`), | |
or to fetch just the first line of the remote file `peek_at_url/0`. | |
This is useful for example for processing large CSV files and figuring out their headers. | |
""" | |
def stream_url("https://" <> _rest = url, consume_line_fun), | |
do: do_stream_url(url, consume_line_fun) | |
def stream_url("http://" <> _rest = url, consume_line_fun), | |
do: do_stream_url(url, consume_line_fun) | |
# this is for dev only, maybe needs cleanup | |
def stream_url(url, consume_line_fun), | |
do: stream_url("http://localhost:4000" <> url, consume_line_fun) | |
def stream_url(url) do | |
init_fun = fn -> | |
{:ok, pid} = __MODULE__.ResourceServer.start_link(url) | |
pid | |
end | |
next_fun = fn pid -> | |
case GenServer.call(pid, :get_line, :infinity) do | |
{:ok, line} -> | |
{[line], pid} | |
{:error, :done} -> | |
{:halt, pid} | |
end | |
end | |
after_fun = fn pid -> | |
GenServer.stop(pid) | |
end | |
stream = Stream.resource(init_fun, next_fun, after_fun) | |
end | |
def peek_at_url("https://" <> _rest = url), do: do_peek_at_url(url) | |
def peek_at_url("http://" <> _rest = url), do: do_peek_at_url(url) | |
def peek_at_url(url), do: peek_at_url("http://localhost:4000" <> url) | |
defp do_peek_at_url(url) do | |
fun = fn tuple, line_so_far -> | |
case tuple do | |
{:data, data} -> | |
data | |
|> String.split("\n") | |
|> consume_first_line(line_so_far) | |
_ -> | |
line_so_far | |
end | |
end | |
try do | |
Finch.build(:get, url) |> Finch.stream(Core.HttpClient, "", fun) | |
rescue | |
e in __MODULE__.StreamingInterrupted -> | |
{:ok, e.line} | |
end | |
end | |
defp do_stream_url(url, consume_line_fun) do | |
fun = fn tuple, line_so_far -> | |
case tuple do | |
{:data, data} -> | |
data | |
|> String.split("\n") | |
|> consume_all_but_last_line(line_so_far, consume_line_fun) | |
_ -> | |
line_so_far | |
end | |
end | |
Finch.build(:get, url) |> Finch.stream(Core.HttpClient, "", fun) | |
end | |
defp consume_all_but_last_line([last_line], line_so_far, consume_line_fun), do: last_line | |
defp consume_all_but_last_line([first_line | rest], line_so_far, consume_line_fun) do | |
line = line_so_far <> first_line | |
consume_line_fun.(line) | |
consume_all_but_last_line(rest, "", consume_line_fun) | |
end | |
defp consume_first_line(lines, line_so_far) do | |
line = line_so_far <> hd(lines) | |
if length(lines) > 0 do | |
raise __MODULE__.StreamingInterrupted, line: line_so_far <> hd(lines) | |
end | |
line | |
end | |
defmodule StreamingInterrupted do | |
defexception message: "streaming interrupted", line: "" | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Core.HttpStreamer.ResourceServer do | |
use GenServer | |
def start_link(url) do | |
GenServer.start_link(__MODULE__, url) | |
end | |
def init(url) do | |
{:ok, %{url: url, lines: [], done: false, transport: nil, caller: nil}, {:continue, :init}} | |
end | |
def handle_continue(:init, %{url: url} = state) do | |
{:ok, _pid} = Core.HttpStreamer.TransportServer.start_link(url) | |
{:noreply, state} | |
end | |
def handle_call(:get_line, _from, %{lines: lines} = state) when length(lines) > 0 do | |
[line | rest] = lines | |
{:reply, {:ok, line}, %{state | lines: rest, caller: nil}} | |
end | |
def handle_call(:get_line, _from, %{lines: [], done: true} = state) do | |
{:reply, {:error, :done}, %{state | caller: nil}} | |
end | |
# postpone returning line until at least one is in the buffer | |
def handle_call(:get_line, from, %{lines: [], done: false} = state) do | |
if state.transport do | |
GenServer.reply(state.transport, :ok) | |
end | |
{:noreply, %{state | caller: from}} | |
end | |
# no process is waiting for a reply, just fill in the buffer and wait | |
def handle_call({:put_lines, lines}, from, %{caller: nil} = state) do | |
{:noreply, %{state | lines: state.lines ++ lines, transport: from}} | |
end | |
# there is a process waiting for a reply, do it and update buffer | |
def handle_call({:put_lines, lines}, from, %{caller: caller} = state) do | |
[line | rest] = state.lines ++ lines | |
GenServer.reply(caller, {:ok, line}) | |
{:noreply, %{state | lines: rest, transport: from}} | |
end | |
# transport finished it's work and there is no caller waiting for another line yet | |
def handle_call(:done, _from, %{caller: nil} = state) do | |
{:reply, :ok, %{state | done: true}} | |
end | |
# transport finished it's work, but there is a caller waiting for another line | |
def handle_call(:done, _from, %{caller: caller} = state) do | |
GenServer.reply(caller, {:error, :done}) | |
{:reply, :ok, %{state | done: true}} | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Core.HttpStreamer.TransportServer do | |
def start_link(url) do | |
resource_server_pid = self() | |
pid = | |
spawn_link(fn -> | |
consume_line_fun = fn line -> | |
:ok = GenServer.call(resource_server_pid, {:put_lines, [line]}, :infinity) | |
end | |
Core.HttpStreamer.stream_url(url, consume_line_fun) | |
:ok = GenServer.call(resource_server_pid, :done, :infinity) | |
end) | |
{:ok, pid} | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment