Created
February 18, 2011 16:21
-
-
Save RJ/833905 to your computer and use it in GitHub Desktop.
gen_server:call balancer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
%% | |
%% Simple pool for gen_servers that only use :call | |
%% @author RJ <rj@metabrew.com> | |
%% | |
-module(gen_server_call_pool). | |
-behaviour(gen_server). | |
-include("irc.hrl"). | |
%% -------------------------------------------------------------------- | |
%% External exports | |
-export([start_link/3, stats/1, forcetimeout/1]). | |
%% gen_server callbacks | |
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). | |
-define(ENABLE_DEBUG, false). | |
-define(QWAIT_WARNING, 100). % in ms | |
-define(QSIZE_WARNING, 25). | |
-record(state, { | |
ready, % worker pids ready | |
busy, % worker pids currently busy | |
work, % FIFO: jobs to send to workers | |
name, % gen_server name to register as | |
m,f,a | |
}). | |
-record(job, {request, % opaque gen_server request tuple | |
from, % opaque gen_server reply tuple | |
dob % time job entered work queue | |
}). | |
start_link(Name, {M,F,A}, Num) -> | |
gen_server:start_link({local, Name}, ?MODULE, [Name,{M,F,A},Num], []). | |
stats(Name) -> | |
gen_server:call(Name, {?MODULE,stats}). | |
forcetimeout(Name) -> gen_server:cast(Name, forcetimeout). | |
%% -------------------------------------------------------------------- | |
init([Name, {M,F,A}, Num]) -> | |
process_flag(trap_exit, true), | |
% start all workers: | |
?INFO("Starting DB workers..",[]), | |
Ready = lists:map(fun(_N) -> {ok, Pid} = erlang:apply(M,F,A), Pid end, lists:seq(1,Num)), | |
?INFO("Started ~B [~w] workers", [length(Ready),M]), | |
{ok, #state{ | |
ready=Ready, | |
busy=[], | |
work=queue:new(), | |
name=Name, | |
m=M,f=F,a=A | |
}}. | |
handle_call({?MODULE,stats}, _From, State) -> | |
case queue:out(State#state.work) of | |
{empty, _} -> T = 0; | |
{value, #job{dob=Dob}} -> T = timer:now_diff(erlang:now(), Dob)/1000 | |
end, | |
S = [ | |
{num_busy, length(State#state.busy)}, | |
{num_ready, length(State#state.ready)}, | |
{jobs_queued, queue:len(State#state.work)}, | |
{current_wait, T} | |
], | |
{reply, S, State}; | |
% job arrives when worker available immediately | |
handle_call(Request, From, State=#state{ready=Ready}) when Ready /= [] -> | |
[Worker|NewReady] = Ready, | |
F = fun() -> | |
case ?ENABLE_DEBUG == true andalso get(dotimeout) == true of | |
true -> | |
Reply = gen_server:call(Worker, Request, 1); | |
_ -> | |
Reply = gen_server:call(Worker, Request) | |
end, | |
% this sends the reply to 'From' and puts worker back in ready list: | |
gen_server:cast(State#state.name, {send_reply, Worker, From, Reply}) | |
end, | |
spawn(F), | |
NewState = State#state{ready=NewReady,busy=[Worker|State#state.busy]}, | |
{noreply, NewState}; | |
% job arrives, no available workers, add to queue | |
handle_call(Request, From, State=#state{ready=[]}) -> | |
Job = #job{request=Request, from=From, dob=erlang:now()}, | |
NewWork = queue:in(Job, State#state.work), | |
%?INFO("Job queue size: ~p, num workers: ~B", [queue:len(NewWork), length(State#state.busy)]), | |
NewState = State#state{work = NewWork}, | |
{noreply, NewState}. | |
handle_cast(forcetimeout, State) -> | |
put(dotimeout, true), | |
{noreply, State}; | |
% worker finished and sends response, dispatch next job, if any waiting | |
handle_cast({send_reply, Worker, To, Reply}, State) -> | |
gen_server:reply(To, Reply), | |
NewReady = [Worker | State#state.ready], | |
NewBusy = lists:delete(Worker, State#state.busy), | |
NewState = State#state{ready=NewReady, busy=NewBusy}, | |
case queue:out(State#state.work) of | |
{empty, _} -> | |
{noreply, NewState}; | |
{{value, #job{request=JobRequest, from=JobFrom, dob=Dob}}, NewWork} -> | |
T = timer:now_diff(erlang:now(), Dob)/1000, | |
QSize = queue:len(State#state.work), | |
case QSize > ?QSIZE_WARNING orelse T > ?QWAIT_WARNING of | |
true -> | |
?INFO("Queued job started, wait time: ~p ms, Qsize: ~p", [T, QSize]); | |
false -> | |
nop | |
end, | |
NewState2 = NewState#state{work=NewWork}, | |
handle_call(JobRequest, JobFrom, NewState2) | |
end. | |
handle_info({'EXIT', Pid, Reason}, State = #state{m=M,f=F,a=A}) -> | |
?WARN("WORKER CRASH: ~p ~p ready:~p busy:~p",[Pid, Reason,State#state.ready, State#state.busy]), | |
R = lists:delete(Pid, State#state.ready), | |
B = lists:delete(Pid, State#state.busy), | |
% spawn a new worker to replace the crashed one | |
{ok, W} = erlang:apply(M,F,A), | |
?INFO("Added new worker to pool to replace crashed: ~p", [W]), | |
NewState = State#state{ready=[W|R],busy=B}, | |
%?WARN("New ready:~p busy:~p",[NewState#state.ready, NewState#state.busy]), | |
{noreply, NewState}. | |
terminate(_Reason, _State) -> | |
ok. | |
code_change(_OldVsn, State, _Extra) -> | |
{ok, State}. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment