-module(connection). | |
-behaviour(gen_statem). | |
-export([start_link/1, request/2]). | |
-export([callback_mode/0, init/1]). | |
-export([disconnected/3, connected/3]). | |
%% Public API. | |
start_link(Opts) -> | |
{host, Host} = proplists:lookup(host, Opts), | |
{port, Port} = proplists:lookup(port, Opts), | |
gen_statem:start_link(?MODULE, {Host, Port}, []). | |
request(Pid, Request) -> | |
gen_statem:call(Pid, {request, Request}). | |
%% gen_statem callbacks | |
callback_mode() -> [state_functions, state_enter]. | |
init({Host, Port}) -> | |
Data = #{host => Host, port => Port, requests => #{}}, | |
Actions = [{next_event, internal, connect}], | |
{ok, disconnected, Data, Actions}. | |
%% Disconnected state | |
disconnected(enter, disconnected, _Data) -> keep_state_and_data; | |
disconnected(enter, connected, #{requests := Requests} = Data) -> | |
io:format("Connection closed~n"), | |
lists:foreach(fun({_, From}) -> gen_statem:reply(From, {error, disconnected}) end, | |
Requests), | |
Data1 = maps:put(socket, undefined, Data), | |
Data2 = maps:put(requests, #{}, Data1), | |
Actions = [{{timeout, reconnect}, 500, undefined}], | |
{keep_state, Data2, Actions}; | |
disconnected(internal, connect, #{host := Host, port := Port} = Data) -> | |
case gen_tcp:connect(Host, Port, [binary, {active, true}]) of | |
{ok, Socket} -> | |
Data1 = maps:put(socket, Socket, Data), | |
{next_state, connected, Data1}; | |
{error, Error} -> | |
io:puts("Connection failed: ~ts~n", [inet:format_error(Error)]), | |
keep_state_and_data | |
end; | |
disconnected({timeout, reconnect}, _, Data) -> | |
Actions = [{next_event, internal, connect}], | |
{keep_state, Data, Actions}; | |
disconnected({call, From}, {request, _}, _Data) -> | |
Actions = [{reply, From, {error, disconnected}}], | |
{keep_state_and_data, Actions}. | |
%% Connected state | |
connected(enter, _OldState, _Data) -> keep_state_and_data; | |
connected(info, {tcp_closed, Socket}, #{socket := Socket} = Data) -> | |
{next_state, disconnected, Data}; | |
connected({call, From}, {request, Request}, #{socket := Socket} = Data) -> | |
#{id := RequestId} = Request, | |
case gen_tcp:send(Socket, encode_request(Request)) of | |
ok -> | |
#{requests := Requests} = Data, | |
Requests1 = maps:put(RequestId, From, Requests), | |
Data1 = maps:put(requests, Data, Requests1), | |
{keep_state, Data1}; | |
{error, _} -> | |
ok = gen_tcp:close(Socket), | |
{next_state, disconnected, Data} | |
end; | |
connected(info, {tcp, Socket, Packet}, #{socket := Socket} = Data) -> | |
#{requests := Requests} = Data, | |
#{id := Id} = Response = decode_response(Packet), | |
From = maps:get(Id, Requests), | |
Requests1 = maps:remove(Id, Requests), | |
Data1 = maps:put(requests, Requests1, Data), | |
gen_statem:reply(From, {ok, Response}), | |
{keep_state, Data1}. |
defmodule Connection do | |
@behaviour :gen_statem | |
require Logger | |
defstruct [:host, :port, :socket, requests: %{}] | |
## Public API | |
def start_link(opts) do | |
host = Keyword.fetch!(opts, :host) | |
port = Keyword.fetch!(opts, :port) | |
:gen_statem.start_link(__MODULE__, {String.to_charlist(host), port}, []) | |
end | |
def request(pid, request) do | |
:gen_statem.call(pid, {:request, request}) | |
end | |
## :gen_statem callbacks | |
@impl true | |
def callback_mode(), do: [:state_functions, :state_enter] | |
@impl true | |
def init({host, port}) do | |
data = %__MODULE__{host: host, port: port} | |
actions = [{:next_event, :internal, :connect}] | |
{:ok, :disconnected, data, actions} | |
end | |
## Disconnected state | |
def disconnected(:enter, :disconnected, _data), do: :keep_state_and_data | |
def disconnected(:enter, :connected, data) do | |
Logger.error("Connection closed") | |
Enum.each(data.requests, fn {_id, from} -> | |
:gen_statem.reply(from, {:error, :disconnected}) | |
end) | |
data = %{data | socket: nil, requests: %{}} | |
actions = [{{:timeout, :reconnect}, 500, nil}] | |
{:keep_state, data, actions} | |
end | |
def disconnected(:internal, :connect, data) do | |
# We use the socket in active mode for simplicity, but | |
# it's often better to use "active: :once" for better control. | |
socket_opts = [:binary, active: true] | |
case :gen_tcp.connect(data.host, data.port, socket_opts) do | |
{:ok, socket} -> | |
{:next_state, :connected, %{data | socket: socket}} | |
{:error, error} -> | |
Logger.error("Connection failed: #{:inet.format_error(error)}") | |
:keep_state_and_data | |
end | |
end | |
def disconnected({:timeout, :reconnect}, _, data) do | |
actions = [{:next_event, :internal, :connect}] | |
{:keep_state, data, actions} | |
end | |
def disconnected({:call, from}, {:request, request}, data) do | |
actions = [{:reply, from, {:error, :disconnected}}] | |
{:keep_state_and_data, actions} | |
end | |
## Connected state | |
def connected(:enter, _old_state, _data), do: :keep_state_and_data | |
def connected(:info, {:tcp_closed, socket}, %{socket: socket} = data) do | |
{:next_state, :disconnected, data} | |
end | |
def connected({:call, from}, {:request, request}, data) do | |
case :gen_tcp.send(data.socket, encode_request(request)) do | |
:ok -> | |
data = %{data | requests: Map.put(data.requests, request.id, from)} | |
{:keep_state, data} | |
{:error, _reason} -> | |
:ok = :gen_tcp.close(socket) | |
{:next_state, :disconnected, data} | |
end | |
end | |
def connected(:info, {:tcp, socket, packet}, %{socket: socket} = data) do | |
response = decode_response(packet) | |
{from, requests} = Map.pop(data.requests, response.id) | |
:gen_statem.reply(from, {:ok, response}) | |
{:keep_state, %{data | requests: requests}} | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment