This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-module(connection). | |
-behaviour(gen_statem). | |
-export([start_link/1, request/2]). | |
-export([callback_mode/0, init/1]). | |
-export([disconnected/3, connected/3]). | |
%% Public API. | |
start_link(Opts) -> | |
{host, Host} = proplists:lookup(host, Opts), | |
{port, Port} = proplists:lookup(port, Opts), | |
gen_statem:start_link(?MODULE, {Host, Port}, []). | |
request(Pid, Request) -> | |
gen_statem:call(Pid, {request, Request}). | |
%% gen_statem callbacks | |
callback_mode() -> [state_functions, state_enter]. | |
init({Host, Port}) -> | |
Data = #{host => Host, port => Port, requests => #{}}, | |
Actions = [{next_event, internal, connect}], | |
{ok, disconnected, Data, Actions}. | |
%% Disconnected state | |
disconnected(enter, disconnected, _Data) -> keep_state_and_data; | |
disconnected(enter, connected, #{requests := Requests} = Data) -> | |
io:format("Connection closed~n"), | |
lists:foreach(fun({_, From}) -> gen_statem:reply(From, {error, disconnected}) end, | |
Requests), | |
Data1 = maps:put(socket, undefined, Data), | |
Data2 = maps:put(requests, #{}, Data1), | |
Actions = [{{timeout, reconnect}, 500, undefined}], | |
{keep_state, Data2, Actions}; | |
disconnected(internal, connect, #{host := Host, port := Port} = Data) -> | |
case gen_tcp:connect(Host, Port, [binary, {active, true}]) of | |
{ok, Socket} -> | |
Data1 = maps:put(socket, Socket, Data), | |
{next_state, connected, Data1}; | |
{error, Error} -> | |
io:puts("Connection failed: ~ts~n", [inet:format_error(Error)]), | |
keep_state_and_data | |
end; | |
disconnected({timeout, reconnect}, _, Data) -> | |
Actions = [{next_event, internal, connect}], | |
{keep_state, Data, Actions}; | |
disconnected({call, From}, {request, _}, _Data) -> | |
Actions = [{reply, From, {error, disconnected}}], | |
{keep_state_and_data, Actions}. | |
%% Connected state | |
connected(enter, _OldState, _Data) -> keep_state_and_data; | |
connected(info, {tcp_closed, Socket}, #{socket := Socket} = Data) -> | |
{next_state, disconnected, Data}; | |
connected({call, From}, {request, Request}, #{socket := Socket} = Data) -> | |
#{id := RequestId} = Request, | |
case gen_tcp:send(Socket, encode_request(Request)) of | |
ok -> | |
#{requests := Requests} = Data, | |
Requests1 = maps:put(RequestId, From, Requests), | |
Data1 = maps:put(requests, Data, Requests1), | |
{keep_state, Data1}; | |
{error, _} -> | |
ok = gen_tcp:close(Socket), | |
{next_state, disconnected, Data} | |
end; | |
connected(info, {tcp, Socket, Packet}, #{socket := Socket} = Data) -> | |
#{requests := Requests} = Data, | |
#{id := Id} = Response = decode_response(Packet), | |
From = maps:get(Id, Requests), | |
Requests1 = maps:remove(Id, Requests), | |
Data1 = maps:put(requests, Requests1, Data), | |
gen_statem:reply(From, {ok, Response}), | |
{keep_state, Data1}. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
defmodule Connection do | |
@behaviour :gen_statem | |
require Logger | |
defstruct [:host, :port, :socket, requests: %{}] | |
## Public API | |
def start_link(opts) do | |
host = Keyword.fetch!(opts, :host) | |
port = Keyword.fetch!(opts, :port) | |
:gen_statem.start_link(__MODULE__, {String.to_charlist(host), port}, []) | |
end | |
def request(pid, request) do | |
:gen_statem.call(pid, {:request, request}) | |
end | |
## :gen_statem callbacks | |
@impl true | |
def callback_mode(), do: [:state_functions, :state_enter] | |
@impl true | |
def init({host, port}) do | |
data = %__MODULE__{host: host, port: port} | |
actions = [{:next_event, :internal, :connect}] | |
{:ok, :disconnected, data, actions} | |
end | |
## Disconnected state | |
def disconnected(:enter, :disconnected, _data), do: :keep_state_and_data | |
def disconnected(:enter, :connected, data) do | |
Logger.error("Connection closed") | |
Enum.each(data.requests, fn {_id, from} -> | |
:gen_statem.reply(from, {:error, :disconnected}) | |
end) | |
data = %{data | socket: nil, requests: %{}} | |
actions = [{{:timeout, :reconnect}, 500, nil}] | |
{:keep_state, data, actions} | |
end | |
def disconnected(:internal, :connect, data) do | |
# We use the socket in active mode for simplicity, but | |
# it's often better to use "active: :once" for better control. | |
socket_opts = [:binary, active: true] | |
case :gen_tcp.connect(data.host, data.port, socket_opts) do | |
{:ok, socket} -> | |
{:next_state, :connected, %{data | socket: socket}} | |
{:error, error} -> | |
Logger.error("Connection failed: #{:inet.format_error(error)}") | |
:keep_state_and_data | |
end | |
end | |
def disconnected({:timeout, :reconnect}, _, data) do | |
actions = [{:next_event, :internal, :connect}] | |
{:keep_state, data, actions} | |
end | |
def disconnected({:call, from}, {:request, request}, data) do | |
actions = [{:reply, from, {:error, :disconnected}}] | |
{:keep_state_and_data, actions} | |
end | |
## Connected state | |
def connected(:enter, _old_state, _data), do: :keep_state_and_data | |
def connected(:info, {:tcp_closed, socket}, %{socket: socket} = data) do | |
{:next_state, :disconnected, data} | |
end | |
def connected({:call, from}, {:request, request}, data) do | |
case :gen_tcp.send(data.socket, encode_request(request)) do | |
:ok -> | |
data = %{data | requests: Map.put(data.requests, request.id, from)} | |
{:keep_state, data} | |
{:error, _reason} -> | |
:ok = :gen_tcp.close(socket) | |
{:next_state, :disconnected, data} | |
end | |
end | |
def connected(:info, {:tcp, socket, packet}, %{socket: socket} = data) do | |
response = decode_response(packet) | |
{from, requests} = Map.pop(data.requests, response.id) | |
:gen_statem.reply(from, {:ok, response}) | |
{:keep_state, %{data | requests: requests}} | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment