Skip to content

Instantly share code, notes, and snippets.

@shino
Created September 29, 2022 03:33
Show Gist options
  • Save shino/f8716ac57d28a4a5b483674ed4b3e1c3 to your computer and use it in GitHub Desktop.
Save shino/f8716ac57d28a4a5b483674ed4b3e1c3 to your computer and use it in GitHub Desktop.
-module(repro).
-export([main/1]).
-export([start_server/1,
setup_logger/0]).
%% -behaviour(ra_machine).
-export([init/1,
apply/3]).
-export([write/3,
get_all/1]).
-include_lib("kernel/include/logger.hrl").
-include_lib("common_test/include/ct.hrl").
-define(CLUSTER_NAME, g).
-define(HOST, "127.0.0.1").
%% Preparation: git clone ra and make, set the path to this argument
%% e.g. $ erlc repro.erl && erl -s repro main "/path/to/ra" -s init stop
main([PathToRa]) ->
{ok, _Pid} = net_kernel:start(['spam@127.0.0.1', longnames]),
true = erlang:set_cookie(node(), spam),
_ = setup_logger(),
[ _ = file:del_dir_r(atom_to_list(Short) ++ "@" ++ ?HOST) || Short <- [n1, n2] ],
Args = pa_args(PathToRa),
{Pid1, N1} = start_node(n1, Args),
{Pid2, N2} = start_node(n2, Args),
{Pid3, N3} = start_node(n3, Args),
ok = erpc_call(N1, ra, start, []),
{ok, Started1, Failed1} = erpc_call(N1, ra, start_cluster, [default, ?CLUSTER_NAME, machine_config(), [id(N1)]]),
?LOG_INFO("ra start_cluster() at n1: started=~p, failed=~p", [Started1, Failed1]),
_ = add_member(N2, N1),
_ = timer:sleep(1000),
_ = add_member(N3, N1),
?LOG_INFO("ra:members() at n1: result=~p", [erpc_call(N1, ra, members, [id(N1)])]),
?LOG_INFO("ra:members() at n3: result=~p", [erpc_call(N3, ra, members, [id(N3)])]),
Writer = spawn_writer(N3, 50, 100),
_WriterMonitor = monitor(process, Writer),
%% Wait for writer do half jobs
receive
go ->
ok
end,
%% Stop n1 and join new node n4
ok = stop_node(N1, Pid1),
{Pid4, N4} = start_node(n4, Args),
_ = add_member(N4, N2),
?LOG_INFO("ra:members() at n3: result=~p", [erpc_call(N3, ra, members, [id(N3)])]),
?LOG_INFO("ra:members() at n4: result=~p", [erpc_call(N4, ra, members, [id(N4)])]),
%% Wait for writer finish
receive
done ->
ok;
{'DOWN', _, _, _, _} = Message ->
?LOG_ERROR("writer process down, message=~p", [Message])
end,
%% ?LOG_INFO("get_all() at n1: result=~p", [erpc_call(N1, ?MODULE, get_all, [N1])]),
%% ?LOG_INFO("get_all() at n3: result=~p", [erpc_call(N3, ?MODULE, get_all, [N3])]),
%% clean up
ok = peer:stop(Pid2),
ok = peer:stop(Pid3),
ok = peer:stop(Pid4),
?LOG_INFO("Finished!!", []),
timer:sleep(100),
ok.
add_member(NodeToAdd, Seed) ->
ok = erpc_call(NodeToAdd, ra, start, []),
AddRes = {ok, _, _} = erpc_call(NodeToAdd, ra, add_member, [id(Seed), id(NodeToAdd)]),
?LOG_INFO("ra:add_member() ~p to ~p: result=~p", [NodeToAdd, Seed, AddRes]),
StartServerRes = erpc_call(NodeToAdd, ?MODULE, start_server, [[NodeToAdd]]),
?LOG_INFO("ra:start_server() at ~p: result=~p", [NodeToAdd, StartServerRes]),
StartServerRes.
spawn_writer(Node, MidCount, Count) ->
Parent = self(),
spawn(fun() ->
Start = erlang:system_time(microsecond),
[ begin
WriteRes = erpc:call(Node, ?MODULE, write, [Node, I, I]),
[ ?LOG_INFO("Writer i=~p, result=~p", [I, WriteRes]) || WriteRes =/= okaaa ],
case I =:= MidCount of
true ->
Parent ! go;
_ ->
ok
end
end || I <- lists:seq(1, Count) ],
Duration = erlang:system_time(microsecond) - Start,
?LOG_INFO("Writer process DONE, ~p [msec] = ~p [usec]", [Duration div 1000, Duration]),
Parent ! done
end).
erpc_call(Node, Mod, Fun, Args) ->
?LOG_INFO("erpc:call(~p) | ~p:~p()", [Node, Mod, Fun]),
erpc:call(Node, Mod, Fun, Args).
pa_args(PathToRa) ->
DepsPath = filename:join([PathToRa, "deps"]),
{ok, DepsDirs0} = file:list_dir(DepsPath),
DepsDirs1 = [ filename:join([DepsPath, Dir]) || Dir <- DepsDirs0 ],
lists:append([ ["-pa", filename:join(Dir, "ebin")] || Dir <- [PathToRa | DepsDirs1] ]).
start_server(Node) ->
ra:start_server(default, ?CLUSTER_NAME, id(node()), machine_config(), [id(Node)]).
start_node(ShortName, Args) ->
{ok, PeerPid, Node} = ?CT_PEER(#{name => ShortName, host => ?HOST, args => Args}),
_ = erpc_call(Node, ?MODULE, setup_logger, []),
{PeerPid, Node}.
stop_node(Node, PeerPid) ->
StopServer = erpc_call(Node, ra, stop_server, [id(Node)]),
?LOG_INFO("ra:stop_server() at ~p: result=~p", [Node, StopServer]),
ok = peer:stop(PeerPid).
machine_config() ->
MachineInitialConfig = #{},
{module, ?MODULE, MachineInitialConfig}.
id(Node) ->
{?MODULE, Node}.
setup_logger() ->
ok = logger:set_primary_config(level, debug),
CustomFilter = fun(#{msg := {report, #{label := {application_controller, Type}}}}, _)
when Type =:= progress;
Type =:= exit ->
stop;
(#{meta := #{domain := [otp, sasl]}}, _) ->
stop;
(#{meta := Meta0} = Msg, _) ->
Msg#{meta => Meta0#{node => node()}}
end,
ok = logger:add_primary_filter(custom, {CustomFilter, no_filter_arg}),
ok = logger:remove_handler_filter(default, remote_gl),
ok = logger:set_handler_config(default, filter_default, log),
ok = logger:update_formatter_config(default, single_line, false),
ok = logger:update_formatter_config(default, legacy_header, false),
ok = logger:update_formatter_config(default,
template,
["[", node, "] ", time, " [", level, "] ", mfa, "\n ", msg, "\n"]),
ok.
%% ra_machine
write(Node, Key, Value) ->
try
%% ?LOG_INFO("write begin: node=~p, value=~p", [Node, Value]),
case ra:process_command(id(Node), {write, Key, Value}, 100) of
{ok, ok, _Leader} ->
ok;
{ok, {error, Reason}, _Leader} ->
{error, Reason};
{error, Reason} ->
{error, {ra_process_command_error, Reason}};
{timeout, _} ->
{error, timeout}
end
catch
C:E:ST ->
?LOG_ERROR("!!!!!!! write error by exception !!!!!!!!!: ~p", [{C, E, ST}]),
{error, {exception, {C, E, ST}}}
end.
get_all(Node) ->
case ra:consistent_query(id(Node),
fun(State) ->
State
end,
1000) of
{ok, State, _} ->
State;
{timeout, _} ->
{error, timeout};
{error, Reason} ->
{error, Reason}
end.
init(Config) ->
?LOG_INFO("ra_machine callback init() called: config=~p", [Config]),
#{}.
apply(_Meta, {write, Node, Value} = Cmd, State) ->
?LOG_INFO("ra_machine callback apply() called: cmd=~p", [Cmd]),
{State#{Node => Value}, ok, []};
apply(_Meta, Cmd, State) ->
?LOG_INFO("ra_machine callback apply() called: cmd=~p", [Cmd]),
{State, {error, not_implemeted}, []}.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment