Skip to content

Instantly share code, notes, and snippets.

@puzza007
Forked from RJ/gen_server_call_pool.erl
Created August 23, 2010 17:46
Show Gist options
  • Save puzza007/545955 to your computer and use it in GitHub Desktop.
Save puzza007/545955 to your computer and use it in GitHub Desktop.
%%
%% Simple pool for gen_servers that only use :call
%%
-module(gen_server_call_pool).
-behaviour(gen_server).
%% --------------------------------------------------------------------
%% External exports
-export([start_link/3, stats/1]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-record(state, {
ready, % worker pids ready
busy, % worker pids currently busy
work, % FIFO: jobs to send to workers
name, % gen_server name to register as
m,f,a
}).
-record(job, {request, % opaque gen_server request tuple
from, % opaque gen_server reply tuple
dob % time job entered work queue
}).
start_link(Name, {M,F,A}, Num) ->
gen_server:start_link({local, Name}, ?MODULE, [Name,{M,F,A},Num], []).
stats(Name) ->
gen_server:call(Name, {?MODULE,stats}).
%% --------------------------------------------------------------------
init([Name, {M,F,A}, Num]) ->
process_flag(trap_exit, true),
% start all workers:
Ready = lists:map(fun(_N) -> {ok, Pid} = erlang:apply(M,F,A), Pid end, lists:seq(1,Num)),
io:format("Started ~B db workers: ~p", [length(Ready), Ready]),
{ok, #state{
ready=Ready,
busy=[],
work=queue:new(),
name=Name,
m=M,f=F,a=A
}}.
handle_call({?MODULE,stats}, _From, State) ->
case queue:out(State#state.work) of
{empty, _} -> T = 0;
{value, #job{dob=Dob}} -> T = timer:now_diff(erlang:now(), Dob)/1000
end,
S = [
{num_busy, length(State#state.busy)},
{num_ready, length(State#state.ready)},
{jobs_queued, queue:len(State#state.work)},
{current_wait, T}
],
{reply, S, State};
% job arrives when worker available immediately
handle_call(Request, From, State=#state{ready=Ready}) when Ready /= [] ->
[Worker|NewReady] = Ready,
F = fun() ->
Reply = gen_server:call(Worker, Request),
% this sends the reply to 'From' and puts worker back in ready list:
gen_server:cast(State#state.name, {send_reply, Worker, From, Reply})
end,
spawn(F),
NewState = State#state{ready=NewReady,busy=[Worker|State#state.busy]},
{noreply, NewState};
% job arrives, no available workers, add to queue
handle_call(Request, From, State=#state{ready=[]}) ->
Job = #job{request=Request, from=From, dob=erlang:now()},
NewWork = queue:in(Job, State#state.work),
io:format("Job queue size: ~p, num workers: ~B", [queue:len(NewWork), length(State#state.busy)]),
NewState = State#state{work = NewWork},
{noreply, NewState}.
% worker finished and sends response, dispatch next job, if any waiting
handle_cast({send_reply, Worker, To, Reply}, State) ->
gen_server:reply(To, Reply),
NewReady = [Worker | State#state.ready],
NewBusy = lists:delete(Worker, State#state.busy),
NewState = State#state{ready=NewReady, busy=NewBusy},
case queue:out(State#state.work) of
{empty, _} ->
{noreply, NewState};
{{value, #job{request=JobRequest, from=JobFrom, dob=Dob}}, NewWork} ->
T = timer:now_diff(erlang:now(), Dob)/1000,
io:format("Queued job started, wait time: ~p ms", [T]),
NewState2 = NewState#state{work=NewWork},
handle_call(JobRequest, JobFrom, NewState2)
end.
handle_info({'EXIT', Pid, Reason}, State = #state{m=M,f=F,a=A}) ->
io:format("WORKER CRASH: ~p ~p ready:~p busy:~p",[Pid, Reason,State#state.ready, State#state.busy]),
R = lists:delete(Pid, State#state.ready),
B = lists:delete(Pid, State#state.busy),
% spawn a new worker to replace the crashed one
{ok, W} = erlang:apply(M,F,A),
io:format("Added new worker to pool to replace crashed: ~p", [W]),
NewState = State#state{ready=[W|R],busy=B},
io:format("New ready:~p busy:~p",[NewState#state.ready, NewState#state.busy]),
{noreply, NewState}.
terminate(Reason, State) ->
ok.
code_change(OldVsn, State, Extra) ->
{ok, State}.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment