Instantly share code, notes, and snippets.

@jyurek /buffer.ex Secret
Created Oct 13, 2017

Embed
What would you like to do?
defmodule Buffer do
@moduledoc "Buffer the data received to parse out statements"
use GenServer
require Logger
@eol <<10>>
@initial_state ""
def create do
GenServer.start_link(__MODULE__, @initial_state)
end
def receive(pid \\ __MODULE__, data) do
GenServer.cast(pid, {:receive, data})
end
def handle_cast({:receive, data}, buffer) do
buffer
|> append(data)
|> process
end
defp append(buffer, ""), do: buffer
defp append(buffer, data), do: buffer <> data
defp process(buffer) do
case extract(buffer) do
{:statement, buffer, statement} ->
MessageSink.receive(statement, Timex.now())
process(buffer)
{:nothing, buffer} ->
{:noreply, buffer}
end
end
defp extract(buffer) do
case String.split(buffer, @eol, parts: 2) do
[match, rest] -> {:statement, rest, match}
[rest] -> {:nothing, rest}
end
end
end
defmodule MessageSink do
def receive(message, time) do
time_string = Timex.format!(time, "%Y-%m-%d %H:%M:%S", :strftime)
IO.puts("#{time_string} #{message}")
end
end
defmodule Receiver do
require Logger
def start(port) do
spawn fn ->
case :gen_tcp.listen(port, [:binary, active: false, reuseaddr: true]) do
{:ok, socket} ->
Logger.info("Receiver listening on port #{port}")
accept_connection(socket)
{:error, reason} ->
Logger.error("Could not start Receiver: #{inspect reason}.")
end
end
end
def accept_connection(socket) do
case :gen_tcp.accept(socket) do
{:ok, client} ->
spawn fn ->
{:ok, buffer_pid} = Buffer.create()
Process.flag(:trap_exit, true)
serve(client, buffer_pid)
end
loop_accept(socket)
{:error, :closed} ->
Logger.warn("#{__MODULE__} restarted, so the listen socket closed.")
{:error, reason} ->
Logger.error("ACCEPT ERROR: #{inspect reason}")
end
end
def serve(socket, buffer_pid) do
case socket |> :gen_tcp.recv(0) do
{:ok, data} ->
buffer_pid = maybe_recreate_buffer(buffer_pid)
Buffer.receive(buffer_pid, data)
serve(socket, buffer_pid)
{:error, reason} ->
Logger.info("Socket terminating: #{inspect reason}")
end
end
defp maybe_recreate_buffer(original_pid) do
receive do
{:EXIT, ^original_pid, _reason} ->
{:ok, new_buffer_pid} = Buffer.create()
new_buffer_pid
after
10 ->
original_pid
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment