Skip to content

Instantly share code, notes, and snippets.

@whatyouhide
Last active April 12, 2023 09:49
Show Gist options
  • Star 10 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save whatyouhide/e7531e10128af58b9830af8938eae478 to your computer and use it in GitHub Desktop.
Save whatyouhide/e7531e10128af58b9830af8938eae478 to your computer and use it in GitHub Desktop.
-module(connection).
-behaviour(gen_statem).
-export([start_link/1, request/2]).
-export([callback_mode/0, init/1]).
-export([disconnected/3, connected/3]).
%% Public API.
start_link(Opts) ->
{host, Host} = proplists:lookup(host, Opts),
{port, Port} = proplists:lookup(port, Opts),
gen_statem:start_link(?MODULE, {Host, Port}, []).
request(Pid, Request) ->
gen_statem:call(Pid, {request, Request}).
%% gen_statem callbacks
callback_mode() -> [state_functions, state_enter].
init({Host, Port}) ->
Data = #{host => Host, port => Port, requests => #{}},
Actions = [{next_event, internal, connect}],
{ok, disconnected, Data, Actions}.
%% Disconnected state
disconnected(enter, disconnected, _Data) -> keep_state_and_data;
disconnected(enter, connected, #{requests := Requests} = Data) ->
io:format("Connection closed~n"),
lists:foreach(fun({_, From}) -> gen_statem:reply(From, {error, disconnected}) end,
Requests),
Data1 = maps:put(socket, undefined, Data),
Data2 = maps:put(requests, #{}, Data1),
Actions = [{{timeout, reconnect}, 500, undefined}],
{keep_state, Data2, Actions};
disconnected(internal, connect, #{host := Host, port := Port} = Data) ->
case gen_tcp:connect(Host, Port, [binary, {active, true}]) of
{ok, Socket} ->
Data1 = maps:put(socket, Socket, Data),
{next_state, connected, Data1};
{error, Error} ->
io:puts("Connection failed: ~ts~n", [inet:format_error(Error)]),
keep_state_and_data
end;
disconnected({timeout, reconnect}, _, Data) ->
Actions = [{next_event, internal, connect}],
{keep_state, Data, Actions};
disconnected({call, From}, {request, _}, _Data) ->
Actions = [{reply, From, {error, disconnected}}],
{keep_state_and_data, Actions}.
%% Connected state
connected(enter, _OldState, _Data) -> keep_state_and_data;
connected(info, {tcp_closed, Socket}, #{socket := Socket} = Data) ->
{next_state, disconnected, Data};
connected({call, From}, {request, Request}, #{socket := Socket} = Data) ->
#{id := RequestId} = Request,
case gen_tcp:send(Socket, encode_request(Request)) of
ok ->
#{requests := Requests} = Data,
Requests1 = maps:put(RequestId, From, Requests),
Data1 = maps:put(requests, Data, Requests1),
{keep_state, Data1};
{error, _} ->
ok = gen_tcp:close(Socket),
{next_state, disconnected, Data}
end;
connected(info, {tcp, Socket, Packet}, #{socket := Socket} = Data) ->
#{requests := Requests} = Data,
#{id := Id} = Response = decode_response(Packet),
From = maps:get(Id, Requests),
Requests1 = maps:remove(Id, Requests),
Data1 = maps:put(requests, Requests1, Data),
gen_statem:reply(From, {ok, Response}),
{keep_state, Data1}.
defmodule Connection do
@behaviour :gen_statem
require Logger
defstruct [:host, :port, :socket, requests: %{}]
## Public API
def start_link(opts) do
host = Keyword.fetch!(opts, :host)
port = Keyword.fetch!(opts, :port)
:gen_statem.start_link(__MODULE__, {String.to_charlist(host), port}, [])
end
def request(pid, request) do
:gen_statem.call(pid, {:request, request})
end
## :gen_statem callbacks
@impl true
def callback_mode(), do: [:state_functions, :state_enter]
@impl true
def init({host, port}) do
data = %__MODULE__{host: host, port: port}
actions = [{:next_event, :internal, :connect}]
{:ok, :disconnected, data, actions}
end
## Disconnected state
def disconnected(:enter, :disconnected, _data), do: :keep_state_and_data
def disconnected(:enter, :connected, data) do
Logger.error("Connection closed")
Enum.each(data.requests, fn {_id, from} ->
:gen_statem.reply(from, {:error, :disconnected})
end)
data = %{data | socket: nil, requests: %{}}
actions = [{{:timeout, :reconnect}, 500, nil}]
{:keep_state, data, actions}
end
def disconnected(:internal, :connect, data) do
# We use the socket in active mode for simplicity, but
# it's often better to use "active: :once" for better control.
socket_opts = [:binary, active: true]
case :gen_tcp.connect(data.host, data.port, socket_opts) do
{:ok, socket} ->
{:next_state, :connected, %{data | socket: socket}}
{:error, error} ->
Logger.error("Connection failed: #{:inet.format_error(error)}")
:keep_state_and_data
end
end
def disconnected({:timeout, :reconnect}, _, data) do
actions = [{:next_event, :internal, :connect}]
{:keep_state, data, actions}
end
def disconnected({:call, from}, {:request, request}, data) do
actions = [{:reply, from, {:error, :disconnected}}]
{:keep_state_and_data, actions}
end
## Connected state
def connected(:enter, _old_state, _data), do: :keep_state_and_data
def connected(:info, {:tcp_closed, socket}, %{socket: socket} = data) do
{:next_state, :disconnected, data}
end
def connected({:call, from}, {:request, request}, data) do
case :gen_tcp.send(data.socket, encode_request(request)) do
:ok ->
data = %{data | requests: Map.put(data.requests, request.id, from)}
{:keep_state, data}
{:error, _reason} ->
:ok = :gen_tcp.close(socket)
{:next_state, :disconnected, data}
end
end
def connected(:info, {:tcp, socket, packet}, %{socket: socket} = data) do
response = decode_response(packet)
{from, requests} = Map.pop(data.requests, response.id)
:gen_statem.reply(from, {:ok, response})
{:keep_state, %{data | requests: requests}}
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment