Created
June 29, 2013 09:03
-
-
Save cdahlqvist/5890454 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-module(robin_handoff). | |
-export([start/0,start/1,start/2,stop/0]). | |
-export([step/0]). | |
%% see if a handoff is in progress | |
active_count(Node) when is_atom(Node) -> | |
Receivers=rpc:call(Node,supervisor,count_children,[riak_core_handoff_receiver_sup]), | |
Senders=rpc:call(Node,supervisor,count_children,[riak_core_handoff_sender_sup]), | |
lager:log(debug,self(),"Check Active: ~p~n~p~n~p",[Node,Receivers,Senders]), | |
case is_list(Receivers) andalso proplists:is_defined(active,Receivers) andalso is_list(Senders) andalso proplists:is_defined(active,Senders) of | |
true -> | |
ActiveReceivers=proplists:get_value(active,Receivers), | |
ActiveSenders=proplists:get_value(active,Senders), | |
ActiveReceivers + ActiveSenders; | |
false -> | |
lager:log(info,self(),"invalid response from ~p",Node), | |
reload | |
end; | |
active_count([Node | Rest]) -> | |
active_count(Node) + active_count(Rest); | |
active_count([]) -> 0; | |
active_count(Node) -> | |
lager:log(info,self(),"Handoff rotation enountered invalid node: ~p.",[Node]), | |
0. | |
%% advance to next pair | |
loop(Delay,[{Node1,Node2}|Rest], NodeList,Concurrency) -> | |
receive | |
tick -> | |
%% disable any new handoffs starting | |
rpc:multicall(NodeList,application,set_env,[riak_core,disable_inbound_handoff,true]), | |
rpc:multicall(NodeList,application,set_env,[riak_core,disable_outbound_handoff,true]), | |
case active_count(NodeList) of | |
0 -> | |
%% if no handoffs active, move on to the next pair | |
rpc:multicall(NodeList,riak_core_handoff_manager,set_concurrency,[0]), | |
rpc:multicall([Node1,Node2],application,set_env,[riak_core,disable_inbound_handoff,false]), | |
rpc:multicall([Node1,Node2],application,set_env,[riak_core,disable_outbound_handoff,false]), | |
rpc:multicall([Node1,Node2],riak_core_handoff_manager,set_concurrency,[Concurrency]), | |
lager:log(info,self(),"handoff_concurrency changed: ~p",[[{Node,rpc:call(Node,application,get_env,[riak_core,handoff_concurrency])} || Node <- NodeList]]), | |
erlang:send_after(Delay,self(),tick), | |
loop(Delay,Rest ++ [{Node1,Node2}],NodeList,Concurrency); | |
reload -> | |
lager:log(info,self(),"Reloading member list"), | |
{ok,Ring} = riak_core_ring_manager:get_my_ring(), | |
Members = riak_core_ring:all_members(Ring), | |
case length(Members) >= 2 of | |
true -> | |
{Left,Right} = lists:splitwith(fun(A) -> A =/= {Node1,Node2} end, buildpairs(Members)), | |
%%immediate tick to try again | |
erlang:send_after(1000,self(),tick), | |
loop(Delay, Right ++ Left, Members, Concurrency); | |
false -> | |
lager:log(info,self(),"Stopping round-robin handoff: Need at least 2 cluster members to handoff") | |
end; | |
_ -> | |
%% don't interrupt handoff in progess, wait 1 minute | |
erlang:send_after(60000,self(),tick) | |
end; | |
_ -> | |
lager:log(info,self(),"Stopping round-robin handoff."), | |
rpc:multicall(NodeList,application,set_env,[riak_core,disable_inbound_handoff,true]), | |
rpc:multicall(NodeList,application,set_env,[riak_core,disable_outbound_handoff,true]), | |
rpc:multicall(NodeList,riak_core_handoff_manager,set_concurrency,[0]), | |
unregister(?MODULE) | |
end. | |
%% build list of all possible 2-node combinations | |
buildpairs([ Node1, Node2 ]) -> | |
[{Node1, Node2}]; | |
buildpairs([Node | Rest]) -> | |
[{Node,Node2} || Node2 <- Rest] ++ buildpairs(Rest). | |
%% start(Delay) begin round-robin concurrency, advance after Delay | |
%% minutes - default 1 hour | |
start() -> | |
start(60,2). | |
start(N) -> | |
start(N,2). | |
start(Delay,Concurrency) -> | |
case whereis(?MODULE) of | |
undefined -> | |
lager:log(info,self(),"Starting round-robin handoff_concurrency of ~p every ~p minutes.",[Concurrency, Delay]), | |
{ok,Ring} = riak_core_ring_manager:get_my_ring(), | |
Members = riak_core_ring:all_members(Ring), | |
if | |
length(Members) > 1 -> | |
register(?MODULE, | |
spawn(fun() -> loop(Delay * 60000, | |
buildpairs(Members),Members,Concurrency) | |
end)), | |
erlang:send_after(Delay,?MODULE,tick); | |
true -> {error, "Need at least 2 cluster members to handoff"} | |
end; | |
_ -> {error, "Already Started"} | |
end. | |
stop() -> | |
case whereis(?MODULE) of | |
undefined -> | |
{error, "Not running"}; | |
Pid -> | |
Pid ! stop | |
end. | |
step() -> | |
whereis(?MODULE) ! tick. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment