Skip to content

Instantly share code, notes, and snippets.

@sile
Last active August 29, 2015 14:12
Show Gist options
  • Save sile/400885329412f65ce3d9 to your computer and use it in GitHub Desktop.
Save sile/400885329412f65ce3d9 to your computer and use it in GitHub Desktop.
分散Erlang周りの性能測定メモ ref: http://qiita.com/sile/items/1852425f0f70e790ebea
-module(dist_bench).
%%% External API
-export([
%% utility
make_node_specs/2,
setup_nodes/1,
do_bench/3,
%% scale out benchmarks
pid_msg_echo_bench/2,
name_msg_echo_bench/2,
rpc_call_bench/2,
monitor_bench/2,
global_bench/1,
pg2_bench/2,
%% scale up benchmarks
pid_msg_echo_scaleup_bench/3,
rpc_call_scaleup_bench/3
]).
%%% Internal API
-export([
echo_server_start/1,
do_echo/2,
do_rpc_call/2,
make_echo_msg/1
]).
%%% Types
-export_type([
node_spec/0,
node_specs/0,
microseconds/0
]).
-type node_spec() :: {HostName::atom(), NodeCount::non_neg_integer()}. % `HostName'上で何個のノードを起動するか
-type node_specs() :: [node_spec()].
-type microseconds() :: non_neg_integer().
%%% External Functions: Utility
%% @doc `TotalNodeCount'で指定された数のノード群を生成するための`node_specs()'を生成して返す
%%
%% `HostNames'で指定された各マシン上で起動されるノード数は、極力等しい数になるように調整される
%%
%% ```
%% > make_node_specs([a,b,c], 10).
%% [{a,4},{b,3},{c,3}]
%% '''
-spec make_node_specs([HostName::atom()], TotalNodeCount::non_neg_integer()) -> node_specs().
make_node_specs(HostNames, TotalNodeCount) ->
PerHostNodeCount = TotalNodeCount div length(HostNames),
RemainderCount = TotalNodeCount rem length(HostNames),
{Specs, _} =
lists:mapfoldl(
fun (Host, Rem) ->
{{Host, PerHostNodeCount + min(1, Rem)}, max(0, Rem - 1)}
end,
RemainderCount,
HostNames),
Specs.
%% @doc スレーブノード群を起動する
%%
%% 呼び出し元プロセスが終了した場合、起動したノード群も自動で終了する(リンクが貼られている)
-spec setup_nodes(node_specs()) -> {SlaveNodeCount, StartAverateTime} when
SlaveNodeCount :: non_neg_integer(), % 起動したスレーブノードの数
StartAverateTime :: microseconds(). % スレーブノードの起動に要した時間の平均値
setup_nodes(NodeSpecs) ->
[] = nodes(),
Tooks =
lists:append(
[[begin
NodeName = list_to_atom(lists:flatten(io_lib:format("slave_~p", [I]))), % スレーブノードの名前は'slave_通し番号'形式
{Took, {ok, _}} = timer:tc(fun () -> slave:start_link(HostName, NodeName) end),
Took
end || I <- lists:seq(0, NodeCount - 1)]
|| {HostName, NodeCount} <- NodeSpecs]),
{_, Bin, File} = code:get_object_code(?MODULE),
rpc:multicall(nodes(), code, load_binary, [?MODULE, File, Bin]),
TookAverage = lists:sum(Tooks) / max(1, length(Tooks)),
{length(Tooks), TookAverage}.
%% @doc スレーブノード群を準備した後に、`BenchFun'で指定されたベンチマークを実行する
-spec do_bench(node_specs(), BenchCount, BenchFun) -> [Result] when
BenchCount :: non_neg_integer(), % `BenchFun'を何回実行するか
BenchFun :: fun (() -> Result), % 個々のベンチマークを実行する関数
Result :: term().
do_bench(NodeSpecs, BenchCount, BenchFun) ->
{Pid, Monitor} =
spawn_monitor(
fun () -> % ベンチマーク終了時にスレーブノード群を確実に終了させたいので、別プロセスで実行する
io:format("# setup nodes ... "),
{NodeCount, StartAverateTime} = setup_nodes(NodeSpecs),
io:format("done: count=~p, start_avg_time=~p sec\n", [NodeCount, StartAverateTime / 1000000]),
Results =
[begin
io:format("# [~p] do bench ... ", [I]),
lists:foreach(fun erlang:garbage_collect/1, erlang:processes()),
{Took, Result} = timer:tc(BenchFun),
io:format("done: time=~p sec\n", [Took / 1000000]),
Result
end || I <- lists:seq(1, BenchCount)],
io:format("\n"),
exit({bench_finished, Results})
end),
receive
{'DOWN', Monitor, _, _, Reason} ->
case Reason of
{bench_finished, Results} -> Results;
_ -> error({bench_process_down, Pid, Reason})
end
end.
%%% External Functions: Scale Out Benchmarks
%% @doc PID指定によるメッセージ送受信のスケールアウト性能を測定するためのベンチマーク関数
-spec pid_msg_echo_bench(non_neg_integer(), [node()]) -> AverageEchoTime::microseconds().
pid_msg_echo_bench(EchoCount, Nodes) ->
%% 1) リモートノードでechoサーバプロセスを起動する
RemoteProcs = [spawn_link(Node, ?MODULE, echo_server_start, [undefined]) || Node <- Nodes],
%% 2) `EchoCount'回だけリモートノード上のプロセスとメッセージをやりとり(echo)し、その所要時間を測定する
{MicroSeconds, _} =timer:tc(fun () -> do_echo(EchoCount, RemoteProcs) end),
%% 3) 後始末
lists:foreach(fun (Pid) -> unlink(Pid), exit(Pid, kill) end, RemoteProcs),
MicroSeconds / EchoCount. % 一回のやりとりに要した平均時間を返す
%% @doc プロセス名指定によるメッセージ送信(+ PID指定による受信)のスケールアウト性能を測定するためのベンチマーク関数
-spec name_msg_echo_bench(non_neg_integer(), [node()]) -> AverageEchoTime::microseconds().
name_msg_echo_bench(EchoCount, Nodes) ->
%% 1) リモートノードでechoサーバプロセスを起動する
RemoteProcs = [spawn_link(Node, ?MODULE, echo_server_start, [echo_server]) || Node <- Nodes],
RemoteNames = [{echo_server, node(Pid)} || Pid <- RemoteProcs],
timer:sleep(100), % NOTE: リモートノード上での名前登録が完了するのを適当な時間だけ待つ (sleepを使っているのは手抜き)
%% 2) `EchoCount'回だけリモートノード上のプロセスとメッセージをやりとり(echo)し、その所要時間を測定する
{MicroSeconds, _} = timer:tc(fun () -> do_echo(EchoCount, RemoteNames) end),
%% 3) 後始末
lists:foreach(fun (Pid) -> unlink(Pid), exit(Pid, kill) end, RemoteProcs),
MicroSeconds / EchoCount. % 一回のやりとりに要した平均時間を返す
%% @doc rpcモジュールを用いた関数呼び出しのスケールアウト性能を測定するためのベンチマーク関数
-spec rpc_call_bench(non_neg_integer(), [node()]) -> AverageCallTime::microseconds().
rpc_call_bench(CallCount, Nodes) ->
%% 1) `CallCount'回だけリモートノードに対してRPC呼び出しを行い、その所要時間を測定する
{MicroSeconds, _} = timer:tc(fun () -> do_rpc_call(CallCount, Nodes) end),
MicroSeconds / CallCount.
%% @doc `erlang:monitor/2'を用いたプロセス死活監視のスケールアウト性能を測定するためのベンチマーク関数
-spec monitor_bench(non_neg_integer(), [node()]) -> AverateTime::microseconds().
monitor_bench(ProcessCount, Nodes) ->
%% 1) リモートノード上で`ProcessCount'個のプロセスを生成する
RemoteProcs = [spawn(Node, timer, sleep, [infinity]) ||
Node <- Nodes,
_ <- lists:seq(1, ProcessCount, length(Nodes))], % NOTE: 剰余分が捨てられてしまう
RealProcessCount = length(RemoteProcs),
%% 2) リモートプロセス群に対する監視登録およびダウン通知の受信に掛かった時間を測定する
{MicroSeconds, _} =
timer:tc(
fun () ->
lists:foreach(fun (Pid) -> monitor(process, Pid) end, RemoteProcs),
lists:foreach(fun (Pid) -> exit(Pid, kill) end, RemoteProcs),
wait_downs(RealProcessCount) % 全ての監視プロセスの'DOWN'メッセージを受け取るまで待機する
end),
MicroSeconds / RealProcessCount.
%% @doc globalモジュールの各種関数のスケールアウト性能を測定するためのベンチマーク関数
-spec global_bench(NameCount) -> [ResultEntry] when
NameCount :: non_neg_integer(), % グローバルなプロセス名の登録/解除/検索を何回行なうか
ResultEntry :: {register, microseconds()} % `global:register_name/2'呼び出しの平均所要時間
| {unregister, microseconds()} % `global:unregister_name/2'呼び出しの平均所要時間
| {whereis, microseconds()}. % `global:whereis_name/2'呼び出しの平均所要時間
global_bench(NameCount) ->
%% 1) ベンチマークに用いる名前およびプロセスの準備
Names = [{?MODULE, I} || I <- lists:seq(1, NameCount)],
Procs = [spawn_link(timer, sleep, [infinity]) || _ <- lists:seq(1, NameCount)],
Pairs = lists:zip(Names, Procs),
%% 2) 各種関数の実行時間を測定
{RegisterMicros, _} = % 登録
timer:tc(
fun () -> lists:foreach(fun ({Name, Proc}) -> yes = global:register_name(Name, Proc) end, Pairs) end),
{WhereisMicros, _} = % 検索
timer:tc(
fun () -> lists:foreach(fun ({Name, Proc}) -> Proc = global:whereis_name(Name) end, Pairs) end),
{UnregisterMicros, _} = % 登録解除
timer:tc(
fun () -> lists:foreach(fun (Name) -> global:unregister_name(Name) end, Names) end),
%% 3) 後始末
lists:foreach(fun (Pid) -> unlink(Pid), exit(Pid, kill) end, Procs),
[
{register, RegisterMicros / NameCount},
{unregister, UnregisterMicros / NameCount},
{whereis, WhereisMicros / NameCount}
].
%% @doc pg2モジュールの各種関数のスケールアウト性能を測定するためのベンチマーク関数
-spec pg2_bench(GroupCount, MemberCount) -> [ResultEntry] when
GroupCount :: non_neg_integer(), % `pg2:create/1'を使って作成するグループの数
MemberCount :: non_neg_integer(), % `pg2:join/2'を使ってグループに参加させるプロセスの数
ResultEntry :: {create, microseconds()} % `pg2:create/1'呼び出しの平均所要時間
| {join, microseconds()} % `pg2:join/2'呼び出しの平均所要時間
| {select, microseconds()}. % `pg2:get_closest_pid/1'呼び出しの平均所要時間
pg2_bench(GroupCount, MemberCount) ->
%% 1) ベンチマークに用いるグループ名およびプロセスの準備
[] = pg2:which_groups(),
Groups = [{group, I} || I <- lists:seq(1, GroupCount)],
Members = [spawn_link(timer, sleep, [infinity]) || _ <- lists:seq(1, MemberCount)],
%% 2) 各種関数の実行時間を測地
{CreateMicros, _} =
timer:tc(
fun () -> lists:foreach(fun pg2:create/1, Groups) end),
{JoinMicros, _} =
timer:tc(
fun () -> lists:foreach(fun (Pid) -> ok = pg2:join(hd(Groups), Pid) end, Members) end),
{SelectMicros, _} =
timer:tc(
fun () -> lists:foreach(fun (_) -> true = is_pid(pg2:get_closest_pid(hd(Groups))) end, Members) end),
%% 3) 後始末
lists:foreach(fun pg2:delete/1, Groups),
lists:foreach(fun (Pid) -> unlink(Pid), exit(Pid, kill) end, Members),
[
{create, CreateMicros / GroupCount},
{join, JoinMicros / MemberCount},
{select, SelectMicros / MemberCount}
].
%%% External Functions: Scale Up Benchmarks
%% @doc PID指定によるメッセージ送受信のスケールアップ性能を測定するためのベンチマーク関数
-spec pid_msg_echo_scaleup_bench(non_neg_integer(), non_neg_integer(), [node()]) -> AverageEchoTime::microseconds().
pid_msg_echo_scaleup_bench(EchoCount, SchedulersOnline, Nodes) ->
%% 1) 利用可能なスケジューラ数(≒ CPUコア数)を制限する
OldSchedulersOnline = erlang:system_flag(schedulers_online, SchedulersOnline),
%% 2) 自ノードにechoサーバ群を起動する
EchoServerCount = SchedulersOnline * 4, % 利用可能なスケジューラ数の四倍のプロセスを起動しておく
EchoServers = [spawn_link(?MODULE, echo_server_start, [undefined]) || _ <- lists:seq(1, EchoServerCount)],
%% 3) `Nodes'上で、echoクライアント群を実行し、全ての処理が完了するまでの所要時間を計測する
PerNodeEchoCount = EchoCount div length(Nodes), % NOTE: 剰余分が捨てられてしまう
RealEchoCount = PerNodeEchoCount * length(Nodes),
{MicroSeconds, _} =
timer:tc(
fun () ->
EchoClients = [monitor(process, spawn(Node, ?MODULE, do_echo, [PerNodeEchoCount, EchoServers])) || Node <- Nodes],
wait_downs(length(EchoClients)) % 全てのechoクライアントが終了するの待つ
end),
%% 4) 後始末
lists:foreach(fun (Pid) -> unlink(Pid), exit(Pid, kill) end, EchoServers),
SchedulersOnline = erlang:system_flag(schedulers_online, OldSchedulersOnline),
MicroSeconds / RealEchoCount.
%% @doc PID指定によるメッセージ送受信のスケールアップ性能を測定するためのベンチマーク関数
-spec rpc_call_scaleup_bench(non_neg_integer(), non_neg_integer(), [node()]) -> AverageCallTime::microseconds().
rpc_call_scaleup_bench(CallCount, SchedulersOnline, Nodes) ->
%% 1) 利用可能なスケジューラ数(≒ CPUコア数)を制限する
OldSchedulersOnline = erlang:system_flag(schedulers_online, SchedulersOnline),
%% 3) `Nodes'上で、echoクライアント群を実行し、全ての処理が完了するまでの所要時間を計測する
PerNodeCallCount = CallCount div length(Nodes), % NOTE: 剰余分が捨てられてしまう
RealCallCount = PerNodeCallCount * length(Nodes),
{MicroSeconds, _} =
timer:tc(
fun () ->
CallClients = [monitor(process, spawn(Node, ?MODULE, do_rpc_call, [PerNodeCallCount, [node()]])) || Node <- Nodes],
wait_downs(length(CallClients)) % 全てのクライアントが終了するの待つ
end),
%% 4) 後始末
SchedulersOnline = erlang:system_flag(schedulers_online, OldSchedulersOnline),
MicroSeconds / RealCallCount.
%%% Internal Functions
%% @doc `Servers'で指定されたプロセス群との間で、合計で`EchoCount'個のメッセージをやりとりする
-spec do_echo(non_neg_integer(), [Server]) -> ok when Server :: pid() | {atom(), node()}.
do_echo(EchoCount, Servers) ->
send_messages(EchoCount, Servers, Servers), % echoメッセージを送信する
wait_messages(EchoCount). % echobackメッセージを待つ (NOTE: 現状はメッセージの中身には考慮していない)
-spec send_messages(non_neg_integer(), [Server], [Server]) -> ok when Server :: pid() | {atom(), node()}.
send_messages(0, _, _) -> ok;
send_messages(N, [], Servers) -> send_messages(N, Servers, Servers); % 一巡したので最初に戻る
send_messages(N, [S | Rest], Servers) ->
S ! {self(), N},
send_messages(N - 1, Rest, Servers).
%% @doc `Count'個メッセージを受信するまで待機する (メッセージの内容には関知しない)
-spec wait_messages(Count::non_neg_integer()) -> ok.
wait_messages(0) -> ok;
wait_messages(N) -> receive _ -> wait_messages(N - 1) end.
-spec wait_downs(Count::non_neg_integer()) -> ok.
wait_downs(0) -> ok;
wait_downs(N) -> receive {'DOWN', _, _, _, _} -> wait_downs(N - 1) end.
%% @doc `Nodes'で指定されたノード群に対して、合計で`CallCount'回のRPC呼び出しを行なう
-spec do_rpc_call(non_neg_integer(), [node()]) -> ok.
do_rpc_call(CallCount, Nodes) ->
rpc_async_calls(CallCount, Nodes, Nodes),
wait_messages(CallCount). % RPCのレスポンスを待つ (NOTE: 現状はメッセージの中身には考慮していない)
-spec rpc_async_calls(non_neg_integer(), [node()], [node()]) -> ok.
rpc_async_calls(0, _, _) -> ok;
rpc_async_calls(N, [], Nodes) -> rpc_async_calls(N, Nodes, Nodes);% 一巡したので最初に戻る
rpc_async_calls(N, [Dst | Rest], Nodes) ->
rpc:async_call(Dst, ?MODULE, make_echo_msg, [N]),
rpc_async_calls(N - 1, Rest, Nodes).
%% @doc エコーサーバを起動する
-spec echo_server_start(Name) -> no_return() when Name :: atom() | undefined.
echo_server_start(undefined) ->
echo_server_loop();
echo_server_start(Name) ->
true = register(Name, self()), % `Name =/= undefined'なら名前付きプロセスとして起動する
echo_server_loop().
-spec echo_server_loop() -> no_return().
echo_server_loop() ->
receive
{From, Msg} ->
From ! make_echo_msg(Msg),
echo_server_loop()
end.
-spec make_echo_msg(term()) -> term().
make_echo_msg(Msg) ->
{self(), Msg}.
%% 使用例:
%%
%% スレーブノードを起動したいホスト名とノードの名前を指定して slave:start_link/2 を呼び出す
%% ※ ここで指定するホストにはrshコマンドでログインができる必要がある
%% (rshコマンドではなく、sshコマンドを使いたい場合はerlコマンドの実行時に'-rsh ssh'オプションを引数で指定する)
> slave:start_link(hoge_host, fuga).
{ok, fuga@hoge_host} % 'fuga@hoge_host'というスレーブノードが起動した
> nodes().
[fuga@hoge_host]
> exit(shutdown). % 呼び出し元(link)プロセスがダウンしたら、スレーブノードは自動で終了させられる
> nodes().
[]
%% [1] dist_bench:setup_nodes/1
%% スレーブノード群の起動関数
> NodeSpecs = [{host1, 3}, {host2, 2}]. % スレーブノードを起動するホストと、そのホストでの起動数を指定する
> dist_bench:setup_nodes(NodeSpecs).
{5, 654112}. % {起動したスレーブノードの数, 各ノードの起動に要した時間の平均(μs)}
> nodes().
[slave_0@host1, slave_1@host1, slave_2@host1, slave_0@host2, slave_1@host2]
> exit(shutdown).
%% [2] dist_bench:make_node_specs/2
%% 'NodeSpecs'の生成補助関数
%% 利用可能なホスト一覧と、スレーブノードの合計起動数を指定すると、各ホスト間で起動数が均等になるように調整してくれる
> dist_bench:make_node_specs([host1, host2], 5).
[{host1, 3}, {host2, 2}]
%% [3] dist_bench:do_bench/3
%% ベンチマーク(計測)実行関数
> ExecuteCount = 5. % 'BenchFun'の実行回数
> BenchFun = fun () -> timer:sleep(100) end. % 具体的な計測処理を実行する関数
> dist_bench:do_bench(NodeSpecs, ExecuteCount, BenchFun).
# setup nodes ... done: count=5, start_avg_time=0.3468948 sec % 最初にスレーブノード群が起動させられる
# [1] do bench ... done: time=0.103409 sec % 'BenchFun'の一回の実行に掛かった時間(前処理や後処理の時間も含むので参考程度)
# [2] do bench ... done: time=0.100309 sec
# [3] do bench ... done: time=0.101726 sec
# [4] do bench ... done: time=0.1009 sec
# [5] do bench ... done: time=0.100786 sec
[ok,ok,ok,ok,ok]
% ↑ 'BenchFun'関数の返り値を集めたもの.
% ここでは'ok'が返されているが、実際の対象処理の計時結果が返されることを期待している.
% 以降では、このリスト中の中央値が、最終的な計時結果として採用されている。
%% 測定方法:
%% - dist_bench:setup_nodes/1関数を使って、スレーブノードの追加に要した時間の平均値を測定する
> Hosts = [host1, host2, host3, host4, host5, host6, host7, host8, host9]. % スレーブ用のEC2インスタンスリスト
> NodeCount = 1. % 起動するスレーブノードの数。1から256の範囲で変化させて、それぞれの値での追加に要した時間を計測する
> dist_bench:setup_nodes(dist_bench:make_node_specs(Hosts, NodeCount)).
{1, 647250.0} % 返り値の第二要素が、各ノードの追加(起動)に要した時間の平均値(μs)
> exit(shutdown). % 'NodeCount'を変更して次の想定を行なう前に、起動したスレーブノード群を停止する
%% 実行例:
%% スレーブノード数=4、でメッセージ送信ベンチマークを実行h
> dist_bench:do_bench(dist_bench:make_node_specs(Hosts, 4), 5,
fun () -> dist_bench:pid_msg_echo_bench(500000, nodes()) end).
%% スレーブノード数=4, でRPCベンチマークを実行
> dist_bench:do_bench(dist_bench:make_node_specs(Hosts, 4), 5,
fun () -> dist_bench:rpc_call_bench(100000, nodes()) end).
%% スレーブノード数=4, でプロセス監視ベンチマークを実行
> dist_bench:do_bench(dist_bench:make_node_specs(Hosts, 4), 5,
fun () -> dist_bench:monitor_bench(100000, nodes()) end).
%% 実行例
%% スレーブノード数=16 で、globalモジュールのベンチマークを実行
> dist_bench:do_bench(dist_bench:make_node_specs(Hosts, 16), 5, fun () -> dist_bench:global_bench(100) end).
[[{register,2349.97},{unregister,3246.77},{whereis,0.47}], % 登録/解除/検索、処理の一回辺りの平均所要時間(μs)
[{register,2509.63},{unregister,2608.14},{whereis,0.49}],
[{register,2420.48},{unregister,3952.45},{whereis,0.59}],
[{register,2449.36},{unregister,2469.99},{whereis,0.41}],
[{register,2647.81},{unregister,2469.52},{whereis,0.44}]]
%% スレーブノード数=16 で、pg2モジュールのベンチマークを実行
> dist_bench:do_bench(dist_bench:make_node_specs(Hosts, 16), 5, fun () -> dist_bench:pg2_bench(100, 100) end).
[[{create,1787.34},{join,1735.18},{select,78.39}], % 作成/メンバ追加/選択、処理の一回辺りの平均所要時間(μs)
[{create,1713.19},{join,1704.63},{select,79.92}],
[{create,1725.01},{join,2014.12},{select,77.63}],
[{create,1764.74},{join,1721.5},{select,78.14}],
[{create,1719.31},{join,1704.02},{select,76.69}]]
%% 実行例:
%% 使用CPUコア数=8 で、RPCのスケールアップ度測定関数を実行する
> dist_bench:do_bench(dist_bench:make_node_specs(Hosts, 9), 5,
fun () -> dist_bench:rpc_call_scaleup_bench(400000, 8, nodes()) end)
%% 使用CPUコア数=8 で、メッセージ送受信(+ echoサーバプロセス複製)のスケールアップ度測定関数を実行する
> dist_bench:do_bench(dist_bench:make_node_specs(Hosts, 9), 5,
fun () -> dist_bench:pid_msg_echo_scaleup_bench(400000, 8, nodes()) end).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment