Mix.install([
{:dsmr, github: "mijnverbruik/dsmr"},
{:kino, "~> 0.12.0"},
{:kino_vega_lite, "~> 0.1.10"}
])
alias VegaLite, as: Vl
defmodule Meter do
use GenServer
require Logger
@connect_timeout 5000
@recv_timeout 5000
def listen(fun, opts) do
GenServer.start_link(__MODULE__, {fun, opts}, name: __MODULE__)
end
@impl true
def init({fun, opts}) do
{:ok, host} = parse_host(opts[:host])
state = %{socket: nil, fun: fun, lines: ""}
{:ok, state, {:continue, host: host, port: opts[:port]}}
end
@impl true
def handle_continue(opts, state) do
socket_opts = [:binary, active: false, packet: :line]
case :gen_tcp.connect(opts[:host], opts[:port], socket_opts, @connect_timeout) do
{:ok, socket} ->
send(self(), :recv_loop)
{:noreply, %{state | socket: socket}}
{:error, reason} ->
Logger.error("Unable to connect to meter - reason: #{inspect(reason)}")
{:stop, :normal, state}
end
end
@impl true
def handle_info(:recv_loop, state) do
case :gen_tcp.recv(state.socket, 0, @recv_timeout) do
{:ok, line} ->
send(self(), {:recv_line, line})
send(self(), :recv_loop)
{:noreply, state}
{:error, reason} ->
Logger.error("Unable to connect to remote TCP socket - reason: #{inspect(reason)}")
{:stop, :normal, state}
end
end
@impl true
def handle_info({:recv_line, "!" <> _ = line}, state) do
if state.lines != "" do
send(self(), {:telegram, state.lines <> line})
end
{:noreply, %{state | lines: ""}}
end
@impl true
def handle_info({:recv_line, "/" <> _ = line}, %{lines: ""} = state) do
# Ignore partially received telegram when starting to read from socket.
{:noreply, %{state | lines: line}}
end
@impl true
def handle_info({:recv_line, _line}, %{lines: ""} = state) do
# Ignore partially received telegram when starting to read from socket.
{:noreply, state}
end
@impl true
def handle_info({:recv_line, line}, state) do
{:noreply, %{state | lines: state.lines <> line}}
end
@impl true
def handle_info({:telegram, raw}, state) do
case DSMR.parse(raw) do
{:ok, telegram} ->
Logger.info("Received telegram - #{telegram.checksum}")
state.fun.(telegram)
{:error, reason} ->
Logger.error("Unable to parse telegram - reason: #{inspect(reason)}")
end
{:noreply, state}
end
defp parse_host(host) when is_binary(host) do
parse_host(String.to_charlist(host))
end
defp parse_host(host) do
case :inet.parse_address(host) do
{:ok, ip} -> {:ok, ip}
{:error, :einval} -> {:ok, host}
end
end
end
usage_plot =
Vl.new(width: 600, height: 400, padding: 20)
|> Vl.repeat(
[layer: ["delivered", "returned"]],
Vl.new()
|> Vl.mark(:line)
|> Vl.encode_field(:x, "date", type: :temporal, title: "Measurement")
|> Vl.encode_repeat(:y, :layer, type: :quantitative, title: "Electricity Usage (kW)")
|> Vl.encode(:color, datum: [repeat: :layer], type: :nominal)
)
|> Kino.VegaLite.new()
telegram_measured_at = fn
{[0, 0, 1, 0, 0], _} -> true
{_, _} -> false
end
telegram_currently_delivered = fn
{[1, 0, 1, 7, 0], _} -> true
{_, _} -> false
end
telegram_currently_returned = fn
{[1, 0, 2, 7, 0], _} -> true
{_, _} -> false
end
{:ok, pid} =
Meter.listen(
fn telegram ->
{_, %DSMR.Timestamp{value: date}} = Enum.find(telegram.data, telegram_measured_at)
{_, %DSMR.Measurement{value: delivered}} =
Enum.find(telegram.data, telegram_currently_delivered)
{_, %DSMR.Measurement{value: returned}} =
Enum.find(telegram.data, telegram_currently_returned)
Kino.VegaLite.push(usage_plot, %{date: date, delivered: delivered, returned: returned},
window: 300
)
end,
host: "10.10.0.129",
port: 23
)
GenServer.stop(pid, :shutdown)