Skip to content

Instantly share code, notes, and snippets.

@RJ
Created February 18, 2011 16:21
Show Gist options
  • Save RJ/833905 to your computer and use it in GitHub Desktop.
Save RJ/833905 to your computer and use it in GitHub Desktop.
gen_server:call balancer
%%
%% Simple pool for gen_servers that only use :call
%% @author RJ <rj@metabrew.com>
%%
-module(gen_server_call_pool).
-behaviour(gen_server).
-include("irc.hrl").
%% --------------------------------------------------------------------
%% External exports
-export([start_link/3, stats/1, forcetimeout/1]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-define(ENABLE_DEBUG, false).
-define(QWAIT_WARNING, 100). % in ms
-define(QSIZE_WARNING, 25).
-record(state, {
ready, % worker pids ready
busy, % worker pids currently busy
work, % FIFO: jobs to send to workers
name, % gen_server name to register as
m,f,a
}).
-record(job, {request, % opaque gen_server request tuple
from, % opaque gen_server reply tuple
dob % time job entered work queue
}).
start_link(Name, {M,F,A}, Num) ->
gen_server:start_link({local, Name}, ?MODULE, [Name,{M,F,A},Num], []).
stats(Name) ->
gen_server:call(Name, {?MODULE,stats}).
forcetimeout(Name) -> gen_server:cast(Name, forcetimeout).
%% --------------------------------------------------------------------
init([Name, {M,F,A}, Num]) ->
process_flag(trap_exit, true),
% start all workers:
?INFO("Starting DB workers..",[]),
Ready = lists:map(fun(_N) -> {ok, Pid} = erlang:apply(M,F,A), Pid end, lists:seq(1,Num)),
?INFO("Started ~B [~w] workers", [length(Ready),M]),
{ok, #state{
ready=Ready,
busy=[],
work=queue:new(),
name=Name,
m=M,f=F,a=A
}}.
handle_call({?MODULE,stats}, _From, State) ->
case queue:out(State#state.work) of
{empty, _} -> T = 0;
{value, #job{dob=Dob}} -> T = timer:now_diff(erlang:now(), Dob)/1000
end,
S = [
{num_busy, length(State#state.busy)},
{num_ready, length(State#state.ready)},
{jobs_queued, queue:len(State#state.work)},
{current_wait, T}
],
{reply, S, State};
% job arrives when worker available immediately
handle_call(Request, From, State=#state{ready=Ready}) when Ready /= [] ->
[Worker|NewReady] = Ready,
F = fun() ->
case ?ENABLE_DEBUG == true andalso get(dotimeout) == true of
true ->
Reply = gen_server:call(Worker, Request, 1);
_ ->
Reply = gen_server:call(Worker, Request)
end,
% this sends the reply to 'From' and puts worker back in ready list:
gen_server:cast(State#state.name, {send_reply, Worker, From, Reply})
end,
spawn(F),
NewState = State#state{ready=NewReady,busy=[Worker|State#state.busy]},
{noreply, NewState};
% job arrives, no available workers, add to queue
handle_call(Request, From, State=#state{ready=[]}) ->
Job = #job{request=Request, from=From, dob=erlang:now()},
NewWork = queue:in(Job, State#state.work),
%?INFO("Job queue size: ~p, num workers: ~B", [queue:len(NewWork), length(State#state.busy)]),
NewState = State#state{work = NewWork},
{noreply, NewState}.
handle_cast(forcetimeout, State) ->
put(dotimeout, true),
{noreply, State};
% worker finished and sends response, dispatch next job, if any waiting
handle_cast({send_reply, Worker, To, Reply}, State) ->
gen_server:reply(To, Reply),
NewReady = [Worker | State#state.ready],
NewBusy = lists:delete(Worker, State#state.busy),
NewState = State#state{ready=NewReady, busy=NewBusy},
case queue:out(State#state.work) of
{empty, _} ->
{noreply, NewState};
{{value, #job{request=JobRequest, from=JobFrom, dob=Dob}}, NewWork} ->
T = timer:now_diff(erlang:now(), Dob)/1000,
QSize = queue:len(State#state.work),
case QSize > ?QSIZE_WARNING orelse T > ?QWAIT_WARNING of
true ->
?INFO("Queued job started, wait time: ~p ms, Qsize: ~p", [T, QSize]);
false ->
nop
end,
NewState2 = NewState#state{work=NewWork},
handle_call(JobRequest, JobFrom, NewState2)
end.
handle_info({'EXIT', Pid, Reason}, State = #state{m=M,f=F,a=A}) ->
?WARN("WORKER CRASH: ~p ~p ready:~p busy:~p",[Pid, Reason,State#state.ready, State#state.busy]),
R = lists:delete(Pid, State#state.ready),
B = lists:delete(Pid, State#state.busy),
% spawn a new worker to replace the crashed one
{ok, W} = erlang:apply(M,F,A),
?INFO("Added new worker to pool to replace crashed: ~p", [W]),
NewState = State#state{ready=[W|R],busy=B},
%?WARN("New ready:~p busy:~p",[NewState#state.ready, NewState#state.busy]),
{noreply, NewState}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment