Skip to content

Instantly share code, notes, and snippets.

@vinoski
Created November 2, 2012 19:39
Show Gist options
  • Save vinoski/801234742f94cadf553a to your computer and use it in GitHub Desktop.
Save vinoski/801234742f94cadf553a to your computer and use it in GitHub Desktop.
erlang scheduler anomaly monitor
%% -------------------------------------------------------------------
%%
%% schedmon: periodically restart Erlang schedulers based on CPU load
%%
%% Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% -------------------------------------------------------------------
-module(schedmon).
-behaviour(gen_server).
-export([start_link/0, start/0, stop/0]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
%% This gen_server tries to detect an anomaly where a subset of Erlang
%% schedulers end up handling all the load of the system and the other
%% schedulers appear to be suspended or asleep. The code examines the
%% CPU load of the Erlang VM process and the load of each individual CPU
%% core, and if it detects a condition where a small subset of the cores
%% are heavily loaded while the other cores are not, it temporarily takes
%% the majority of Erlang schedulers offline and then brings them back
%% online, which as we've observed in practice clears up the imbalance
%% of load across Erlang schedulers.
%% SAMPLE_RATE controls how often the CPU load is sampled to try to detect
%% offline schedulers.
-define(SAMPLE_RATE, 150*1000). % 2.5 minutes, in milliseconds
%% LOW_CPU_RANGE and HI_CPU_RANGE bound the range within which we might
%% detect the offline scheduler anomaly. The range reflects the CPU
%% percentage used by the current process.
-define(LOW_CPU_RANGE, 80).
-define(HI_CPU_RANGE(Schedulers), Schedulers*100 div 4).
%% CORE_LOW_LOAD and CORE_HI_LOAD are the values used to determine whether
%% a CPU core is loaded or not. They refer to percentage CPU load. If a
%% core has a load lower than CORE_LOW_LOAD it qualifies as not being
%% loaded. If a core had a load higher than CORE_HI_LOAD it qualifies as
%% being loaded.
-define(CORE_LOW_LOAD, 5).
-define(CORE_HI_LOAD, 95).
%% MAX_CORE_LOAD_RATIO and MIN_CORE_NOLOAD_RATIO refer to the ratio of CPU
%% cores that are loaded and not loaded, respectively, under the conditions
%% of the offline scheduler anomaly. For example, in an 8-core system we
%% look for one or two cores to be loaded greater than CORE_HI_LOAD and the
%% other six to have loads lower than CORE_LOW_LOAD.
-define(MAX_CORE_LOAD_RATIO, 0.25).
-define(MIN_CORE_NOLOAD_RATIO, 1-?MAX_CORE_LOAD_RATIO).
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-endif.
-type total_pid_cpu() :: {integer(),integer()}.
-type per_cpu() :: {string(),integer(),integer()}.
-type all_cpus() :: [per_cpu()].
-type proc_stat_read() :: {ok, string()} | eof | {error, any()}.
-record(state, {
schedulers :: integer(),
regex :: tuple(),
total :: total_pid_cpu(),
all_cpus :: all_cpus()
}).
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
start() ->
gen_server:start({local, ?MODULE}, ?MODULE, [], []).
stop() ->
gen_server:cast(?MODULE, stop).
init([]) ->
case file:read_file_info("/proc/stat") of
{ok, _} ->
{ok, Regex} = re:compile("^cpu(\\d+)*\\s+"),
Scheds = erlang:system_info(schedulers_online),
erlang:send_after(?SAMPLE_RATE, self(), check),
{ok, #state{schedulers=Scheds, regex=Regex, total={0,0}, all_cpus=[]}};
_ ->
ignore
end.
handle_call(_Request, _From, State) ->
{reply, ok, State}.
handle_cast(stop, State) ->
{stop, normal, State};
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(check, #state{regex=Regex, total=Total, all_cpus=OldAllCpus}=State) ->
Scheds = State#state.schedulers,
{{NewAll, NewPid}, NewAllCpus} = parse_cpu_lines(Regex, Total, OldAllCpus),
{OldAll,OldPid} = Total,
TotalDiff = NewAll - OldAll,
PidDiff = NewPid - OldPid,
TotalCPU = Scheds*100*PidDiff/TotalDiff,
case cpu_in_target_range(Scheds, TotalCPU) of
true ->
case cores_in_target_range(Scheds, OldAllCpus, NewAllCpus) of
{true, SchedsNotLoaded} ->
reset_schedulers(Scheds, SchedsNotLoaded);
false ->
ok
end;
false ->
ok
end,
erlang:send_after(?SAMPLE_RATE, self(), check),
{noreply, State#state{total={NewAll,NewPid}, all_cpus=NewAllCpus}};
handle_info(_Info, State) ->
{noreply, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%% internal functions
-spec parse_cpu_lines(tuple(), total_pid_cpu(), all_cpus()) ->
{total_pid_cpu(), all_cpus()}.
parse_cpu_lines(Regex, Total, AllCpus) ->
{ok, CpuLines} = get_cpu_info(),
parse_cpu_lines(CpuLines, Regex, Total, AllCpus).
parse_cpu_lines([], _, Total, AllCpus) ->
{Total, AllCpus};
parse_cpu_lines([CpuLine|CpuLines], Regex, All, AllCpus) ->
case re:run(CpuLine, Regex) of
{match, [{0,Len}]} ->
StrNums = re:split(string:substr(CpuLine, Len+1), "\\s+", [{return,list}]),
Nums = [list_to_integer(L) || L <- StrNums, L /= []],
Total = lists:sum(Nums),
PidInfo = pid_cpu_info(),
parse_cpu_lines(CpuLines, Regex, {Total,PidInfo}, AllCpus);
{match, [{0,Len},{Offset,KLen}]} ->
StrNums = re:split(string:substr(CpuLine, Len+1), "\\s+", [{return,list}]),
Nums = [list_to_integer(L) || L <- StrNums, L /= []],
Key = string:substr(CpuLine, Offset+1, KLen),
CpuTotal = lists:sum(Nums),
CpuIdle = lists:nth(4, Nums),
NKeyVal = {Key,CpuTotal,CpuIdle},
parse_cpu_lines(CpuLines, Regex, All, lists:keystore(Key,1,AllCpus,NKeyVal))
end.
-spec get_cpu_info() -> {ok, [string()]} | {error, any()}.
get_cpu_info() ->
{ok, F} = file:open("/proc/stat", [read,raw]),
try
get_cpu_info(F, file:read_line(F), [])
after
file:close(F)
end.
-spec get_cpu_info(file:io_device(), proc_stat_read(), [string()]) ->
{ok, [string()]} |
{error, any()} |
ignore.
get_cpu_info(_, eof, Acc) ->
{ok, lists:reverse(Acc)};
get_cpu_info(_, {error, _}=Error, _) ->
Error;
get_cpu_info(F, {ok, "cpu"++_=Str}, Acc) ->
get_cpu_info(F, file:read_line(F), [Str|Acc]);
get_cpu_info(F, {ok, _}, Acc) ->
get_cpu_info(F, file:read_line(F), Acc).
-spec pid_cpu_info() -> integer().
pid_cpu_info() ->
ProcPidStat = filename:join(["/proc",os:getpid(),"stat"]),
{ok, F} = file:open(ProcPidStat, [read,raw]),
{ok, Line} = file:read_line(F),
file:close(F),
Tokens = string:tokens(Line, " \t"),
UserAndSystem = [lists:nth(14,Tokens), lists:nth(15,Tokens)],
lists:sum([list_to_integer(V) || V <- UserAndSystem]).
-spec cpu_in_target_range(integer(), float()) -> boolean().
cpu_in_target_range(Scheds, TotalCPU)
when TotalCPU < ?LOW_CPU_RANGE; TotalCPU > ?HI_CPU_RANGE(Scheds) ->
false;
cpu_in_target_range(_, _) ->
true.
-spec cores_in_target_range(integer(), all_cpus(), all_cpus()) ->
{true, integer()} | false.
cores_in_target_range(Scheds, OldAllCpus, NewAllCpus) ->
{Loaded,NotLoaded} =
lists:foldl(fun(Core, Acc) ->
core_load(OldAllCpus, Core, Acc)
end, {0,0}, NewAllCpus),
LoadedRatio = Loaded/Scheds,
NotLoadedRatio = NotLoaded/Scheds,
if
LoadedRatio > 0 andalso
LoadedRatio =< ?MAX_CORE_LOAD_RATIO andalso
NotLoadedRatio >= ?MIN_CORE_NOLOAD_RATIO ->
{true, NotLoaded};
true ->
false
end.
-spec core_load(all_cpus(), per_cpu(), {integer(),integer()}) ->
{integer(),integer()}.
core_load(OldAllCpus, {K,T,I}, {L,NL}=Acc) ->
{K,OldT,OldI} = case lists:keyfind(K,1,OldAllCpus) of
false -> {K,0,0};
Val -> Val
end,
TDiff = T - OldT,
IDiff = I - OldI,
Usage = 100*(TDiff-IDiff)/TDiff,
if
Usage < ?CORE_LOW_LOAD ->
{L, NL+1};
Usage > ?CORE_HI_LOAD ->
{L+1, NL};
true ->
Acc
end.
-spec reset_schedulers(integer(), integer()) -> ok.
reset_schedulers(Scheds, SchedsToReset) ->
NewScheds = Scheds - SchedsToReset,
Scheds = erlang:system_flag(schedulers_online, NewScheds),
timer:sleep(1000),
NewScheds = erlang:system_flag(schedulers_online, Scheds),
ok.
-ifdef(TEST).
-endif.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment