defmodule Buffer do | |
@moduledoc "Buffer the data received to parse out statements" | |
use GenServer | |
require Logger | |
@eol <<10>> | |
@initial_state "" | |
def create do | |
GenServer.start_link(__MODULE__, @initial_state) | |
end | |
def receive(pid \\ __MODULE__, data) do | |
GenServer.cast(pid, {:receive, data}) | |
end | |
def handle_cast({:receive, data}, buffer) do | |
buffer | |
|> append(data) | |
|> process | |
end | |
defp append(buffer, ""), do: buffer | |
defp append(buffer, data), do: buffer <> data | |
defp process(buffer) do | |
case extract(buffer) do | |
{:statement, buffer, statement} -> | |
MessageSink.receive(statement, Timex.now()) | |
process(buffer) | |
{:nothing, buffer} -> | |
{:noreply, buffer} | |
end | |
end | |
defp extract(buffer) do | |
case String.split(buffer, @eol, parts: 2) do | |
[match, rest] -> {:statement, rest, match} | |
[rest] -> {:nothing, rest} | |
end | |
end | |
end |
defmodule MessageSink do | |
def receive(message, time) do | |
time_string = Timex.format!(time, "%Y-%m-%d %H:%M:%S", :strftime) | |
IO.puts("#{time_string} #{message}") | |
end | |
end |
defmodule Receiver do | |
require Logger | |
def start(port) do | |
spawn fn -> | |
case :gen_tcp.listen(port, [:binary, active: false, reuseaddr: true]) do | |
{:ok, socket} -> | |
Logger.info("Receiver listening on port #{port}") | |
accept_connection(socket) | |
{:error, reason} -> | |
Logger.error("Could not start Receiver: #{inspect reason}.") | |
end | |
end | |
end | |
def accept_connection(socket) do | |
case :gen_tcp.accept(socket) do | |
{:ok, client} -> | |
spawn fn -> | |
{:ok, buffer_pid} = Buffer.create() | |
Process.flag(:trap_exit, true) | |
serve(client, buffer_pid) | |
end | |
loop_accept(socket) | |
{:error, :closed} -> | |
Logger.warn("#{__MODULE__} restarted, so the listen socket closed.") | |
{:error, reason} -> | |
Logger.error("ACCEPT ERROR: #{inspect reason}") | |
end | |
end | |
def serve(socket, buffer_pid) do | |
case socket |> :gen_tcp.recv(0) do | |
{:ok, data} -> | |
buffer_pid = maybe_recreate_buffer(buffer_pid) | |
Buffer.receive(buffer_pid, data) | |
serve(socket, buffer_pid) | |
{:error, reason} -> | |
Logger.info("Socket terminating: #{inspect reason}") | |
end | |
end | |
defp maybe_recreate_buffer(original_pid) do | |
receive do | |
{:EXIT, ^original_pid, _reason} -> | |
{:ok, new_buffer_pid} = Buffer.create() | |
new_buffer_pid | |
after | |
10 -> | |
original_pid | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment