Skip to content

Instantly share code, notes, and snippets.

@alexdesousa
Last active October 5, 2018 15:03
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save alexdesousa/35f707acf4310539a8e9 to your computer and use it in GitHub Desktop.
Save alexdesousa/35f707acf4310539a8e9 to your computer and use it in GitHub Desktop.
gen_server: concurrent vs. serial calls.
-module(ping_server).
-behaviour(gen_server).
-export([start/0, serial_burst/2, concurrent_burst/2]).
%gen_server callbacks.
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, code_change/3,
terminate/2]).
-define(TIMEOUT, 5000).
start() -> gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
%% Sends message to Node.
send_message(Node, Message, N) ->
try
Result = gen_server:call({?MODULE, Node}, Message, ?TIMEOUT),
io:format("~p message ~p response: ~p~n", [Message, N, Result]),
Result
catch
_:_ ->
io:format("~p message ~p timeout~n", [Message, N]),
pang
end.
%% Sends a burst of Size with the same Message to Node.
ping_burst(Node, Message, Size) ->
lists:map(fun(X) ->
spawn(fun() ->
send_message(Node, Message, X)
end)
end,
lists:seq(0, Size - 1)).
serial_burst(Node, Size) ->
ping_burst(Node, serial_ping, Size),
ok.
concurrent_burst(Node, Size) ->
ping_burst(Node, concurrent_ping, Size),
ok.
%% Heavy function simulation.
heavy(MaxDelay) -> timer:sleep(random:uniform(MaxDelay)).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% gen_server call-backs implementation.
init([]) -> {ok, {max_delay, 2000}}.
handle_call(serial_ping, _From, State = {max_delay, MaxDelay}) ->
heavy(MaxDelay),
{reply, pong, State};
handle_call(concurrent_ping, From, State) ->
?MODULE ! {concurrent_ping, From},
{noreply, State};
handle_call(stop, _From, State) ->
{stop, normal, ok, State};
handle_call(_, _, State) ->
{reply, not_implemented, State}.
handle_info({concurrent_ping, From}, State = {max_delay, MaxDelay}) ->
spawn(fun() ->
heavy(MaxDelay),
gen_server:reply(From, pong)
end),
{noreply, State};
handle_info(_Message, State) ->
{noreply, State}.
handle_cast(_, State) -> {noreply, State}.
code_change(_OldVsn, State, _Extra) -> {ok, State}.
terminate(_Reason, _State) -> ok.
@alexdesousa
Copy link
Author

This code example shows both serial and concurrent services in a gen_server.

By default gen_server is a serial server. Responses are usually generated in the handle_call callback and sent to the client. If the server takes too much time to prepare a response to a client message, the clients in queue must wait in line. If the clients have timeouts on their calls, then an error will occur in the client.

Things to note:

  • When the server receives the messages serial_ping and concurrent_ping executes a function called heavy/1 that takes a from 0 to maximum delay milliseconds to return. This is to simulate a function that takes too long to execute.
  • To start the server in an Erlang node called gandalf@127.0.0.1, execute the following:
alexdesousa@arya: ~$ erl -name gandalf@127.0.0.1
Erlang/OTP 17 [erts-6.2] [source] [64-bit] [smp:2:2] [async-threads:10] [kernel-poll:false]

Eshell V6.2  (abort with ^G)
(gandalf@127.0.0.1)1> c(["ping_server"]).
{ok,ping_server}
(gandalf@127.0.0.1)2> ping_server:start().
{ok,<0.46.0>}

Serial service

The serial service is implemented as a clause of the gen_server callback
handle_call (lines 52-54):

(...)
handle_call(serial_ping, _From, State = {max_delay, MaxDelay}) ->
    heavy(MaxDelay),
    {reply, pong, State};
(...)

when using the function serial_burst that sends N number of messages to the serial_ping service on the Node this is the output:

(gandalf@127.0.0.1)3> ping_server:serial_burst(node(), 10).
ok
serial_ping message 0 response: pong
serial_ping message 1 response: pong
serial_ping message 2 response: pong
serial_ping message 9 timeout
serial_ping message 8 timeout
serial_ping message 7 timeout
serial_ping message 6 timeout
serial_ping message 5 timeout
serial_ping message 4 timeout
serial_ping message 3 timeout
(gandalf@127.0.0.1)4>

From 10 requests only 3 got a response. One way to fix this is to give the time
cunsuming job to another process.

Concurrent service

The concurrent server is implemented as a clause of the gen_server callback
handle_call (lines 55-57) in conjunctuion with a clause of the
handle_info (lines 64-69) callback where the server spawns a process to
do the job:

(...)
handle_call(concurrent_ping, From, State) ->
    ?MODULE ! {concurrent_ping, From},
    {noreply, State};
(...)
handle_info({concurrent_ping, From}, State = {max_delay, MaxDelay}) ->
    spawn(fun() ->
            heavy(MaxDelay),
            gen_server:reply(From, pong)
          end),
    {noreply, State};
(...)

This time the 10 requests got a response from the server:

(gandalf@127.0.0.1)4> ping_server:concurrent_burst(node(), 10).
ok
concurrent_ping message 9 response: pong
concurrent_ping message 8 response: pong
concurrent_ping message 7 response: pong
concurrent_ping message 6 response: pong
concurrent_ping message 5 response: pong
concurrent_ping message 4 response: pong
concurrent_ping message 3 response: pong
concurrent_ping message 2 response: pong
concurrent_ping message 1 response: pong
concurrent_ping message 0 response: pong

Though this works, implementing concurrent servers this way in Erlang can get a
bit messy.

A Robust Solution

The server has several flaws:

  • If an error occurs, the server dies.
  • The code can get a bit messy.
  • The process serving the client (concurrent service) dies when the connection ends. This is not efficient when the client is sending a lot of requests in a short period of time.
  • If there are too much serial clients, the concurrent clients will be affected, because the queue is the same for all.

I implemented tentacles_server, a gen_server wrapper, that takes care of all this flaws:

The same server can be accomplished with:

-module(tentacles_ping_server_controller).

-behaviour(tentacles_controller).

-export([start/0, serial_burst/2, concurrent_burst/2]).

% Callbacks.
-export([init/2, handle_message/2, handle_timeout/1, handle_event/2,
         handle_termination/2]).

start() ->
    tentacles_server_sup:start_link(example, []).

%% Sends message to Node.
send_message(Node, Type, ConnectionID, N) ->
    try
        Result = case Type of
            serial ->
              tentacles_dispatcher:sync_message(example, Node, ConnectionID, ping);
            concurrent ->
              tentacles_dispatcher:async_message(example, Node, ConnectionID, ping);
            _ ->
              none
        end,
        io:format("~p message ~p response: ~p~n", [ping, N, Result]),
        Result
    catch
        _:_ ->
            io:format("~p message ~p timeout~n", [ping, N]),
            pang
    end.

%% Sends a burst of Size with the same message to Node.
ping_burst(Node, Type, ConnectionID, Size) ->
    lists:map(fun(X) ->
                spawn(fun() ->
                            send_message(Node, Type, ConnectionID, X)
                      end)
              end,
              lists:seq(0, Size - 1)).

serial_burst(Node, Size) ->
    ping_burst(Node, serial, 'serial_client', Size),
    ok.

concurrent_burst(Node, Size) ->
    ping_burst(Node, concurrent, 'concurrent_client', Size),
    ok.

%% Heavy function simulation.
heavy(MaxDelay) -> timer:sleep(random:uniform(MaxDelay)).

%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%  call-backs implementation.

init(_BaseName, _Id) -> {ok, {max_delay, 2000}}.

handle_message(ping, State) ->
    heavy(State#state.max_delay),
    {reply, pong, State};
handle_message(_Any, State) ->
    {noreply, State}.

handle_timeout(State) ->
    {noreply, State}.

handle_event(_, State) ->
    {no_reply, State}.

handle_termination(_Reason, _State) ->
    ok.

Advantages:

  • This server has a supervisor that restarts the server in case of failure, so the
    probability of being down is very low.
  • Serial clients won't affect the concurrent clients.
  • The code is easier to read.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment