-
-
Save jyurek/0c1aac357cbcc8f52007627e8658d724 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Buffer do | |
@moduledoc "Buffer the data received to parse out statements" | |
use GenServer | |
require Logger | |
@eol <<10>> | |
@initial_state "" | |
def create do | |
GenServer.start_link(__MODULE__, @initial_state) | |
end | |
def receive(pid \\ __MODULE__, data) do | |
GenServer.cast(pid, {:receive, data}) | |
end | |
def handle_cast({:receive, data}, buffer) do | |
buffer | |
|> append(data) | |
|> process | |
end | |
defp append(buffer, ""), do: buffer | |
defp append(buffer, data), do: buffer <> data | |
defp process(buffer) do | |
case extract(buffer) do | |
{:statement, buffer, statement} -> | |
MessageSink.receive(statement, Timex.now()) | |
process(buffer) | |
{:nothing, buffer} -> | |
{:noreply, buffer} | |
end | |
end | |
defp extract(buffer) do | |
case String.split(buffer, @eol, parts: 2) do | |
[match, rest] -> {:statement, rest, match} | |
[rest] -> {:nothing, rest} | |
end | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule MessageSink do | |
def receive(message, time) do | |
time_string = Timex.format!(time, "%Y-%m-%d %H:%M:%S", :strftime) | |
IO.puts("#{time_string} #{message}") | |
end | |
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Receiver do | |
require Logger | |
def start(port) do | |
spawn fn -> | |
case :gen_tcp.listen(port, [:binary, active: false, reuseaddr: true]) do | |
{:ok, socket} -> | |
Logger.info("Receiver listening on port #{port}") | |
accept_connection(socket) | |
{:error, reason} -> | |
Logger.error("Could not start Receiver: #{inspect reason}.") | |
end | |
end | |
end | |
def accept_connection(socket) do | |
case :gen_tcp.accept(socket) do | |
{:ok, client} -> | |
spawn fn -> | |
{:ok, buffer_pid} = Buffer.create() | |
Process.flag(:trap_exit, true) | |
serve(client, buffer_pid) | |
end | |
loop_accept(socket) | |
{:error, :closed} -> | |
Logger.warn("#{__MODULE__} restarted, so the listen socket closed.") | |
{:error, reason} -> | |
Logger.error("ACCEPT ERROR: #{inspect reason}") | |
end | |
end | |
def serve(socket, buffer_pid) do | |
case socket |> :gen_tcp.recv(0) do | |
{:ok, data} -> | |
buffer_pid = maybe_recreate_buffer(buffer_pid) | |
Buffer.receive(buffer_pid, data) | |
serve(socket, buffer_pid) | |
{:error, reason} -> | |
Logger.info("Socket terminating: #{inspect reason}") | |
end | |
end | |
defp maybe_recreate_buffer(original_pid) do | |
receive do | |
{:EXIT, ^original_pid, _reason} -> | |
{:ok, new_buffer_pid} = Buffer.create() | |
new_buffer_pid | |
after | |
10 -> | |
original_pid | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment