Skip to content

Instantly share code, notes, and snippets.

@abhi-bit
Created January 13, 2016 10:01
Show Gist options
  • Save abhi-bit/d58a5ffabe3ee081eab2 to your computer and use it in GitHub Desktop.
Save abhi-bit/d58a5ffabe3ee081eab2 to your computer and use it in GitHub Desktop.
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(couch_view_merger_queue).
% TODO: try to simplify this module, taking into account efficient/practical
% use by couch_view_merger.erl
% public API
-export([start_link/2]).
% consumer API
-export([pop/1, pop_next/1, peek/1, flush/1]).
% producer API
-export([queue/2, done/1]).
% gen_server callbacks
-export([init/1, handle_call/3, handle_info/2, handle_cast/2]).
-export([code_change/3, terminate/2]).
-include("couch_db.hrl").
-record(state, {
rows, % a functional priority queue (skew)
poped = [], % list of so far poped items
consumer = nil, % pop request (only 1 consumer is supported)
less_fun,
num_producers
}).
start_link(NumProducers, LessFun) when is_integer(NumProducers), NumProducers > 0 ->
gen_server:start_link(?MODULE, {NumProducers, LessFun}, []).
pop(Pid) ->
try
gen_server:call(Pid, pop, infinity)
catch
exit:{shutdown, {gen_server, call, [Pid | _]}} ->
closed;
exit:{noproc, {gen_server, call, [Pid | _]}} ->
closed;
exit:{normal, {gen_server, call, [Pid | _]}} ->
closed
end.
pop_next(Pid) ->
gen_server:call(Pid, pop_next, infinity).
peek(Pid) ->
gen_server:call(Pid, peek).
queue(Pid, Row) ->
NextRef = make_ref(),
case wait_previous_row_processed(Pid) of
{ok, MonRef} ->
receive
{'DOWN', MonRef, _, _, _} ->
throw(queue_shutdown)
after 0 ->
erlang:demonitor(MonRef, [flush]),
ok = gen_server:cast(Pid, {queue, Row, self(), NextRef}),
erlang:put(queue_ref, NextRef)
end,
ok;
closed ->
throw(queue_shutdown)
end.
flush(Pid) ->
ok = gen_server:cast(Pid, flush).
% Producers call this when they're done (they will not queue anymore).
done(Pid) ->
case wait_previous_row_processed(Pid) of
{ok, MonRef} ->
receive
{'DOWN', MonRef, _, _, _} ->
ok
after 0 ->
erlang:demonitor(MonRef, [flush]),
ok = gen_server:cast(Pid, done)
end;
closed ->
ok
end.
init({NumProducers, _LessFun}) ->
{ok, PQueue} = merger:new(),
State = #state{
num_producers = NumProducers,
rows = PQueue
%less_fun = fun({_, _, A}, {_, _, B}) -> LessFun(A, B) end
},
{ok, State}.
handle_call(pop, From, #state{poped = []} = State) ->
#state{
rows = Rows,
num_producers = N
} = State,
case merger:size(Rows) < N of
true ->
{noreply, State#state{consumer = From}};
false ->
{ok, {_Key, {MinRow, Pid, Ref}}} = merger:out(Rows),
error_logger:error_msg("ABHI: Poped out ~p~n", [{Pid, Ref, MinRow}]),
{reply, {ok, MinRow}, State#state{rows = Rows, poped = [{Pid, Ref}]}}
end;
handle_call(pop_next, _From, #state{poped = [_ | _] = Poped} = State) ->
#state{rows = Rows, less_fun = _LessFun} = State,
case merger:size(Rows) of
0 ->
{reply, empty, State};
_ ->
{ok, MinRow, {Pid, Ref}} = merger:out(Rows),
%{{Pid, Ref, MinRow}, Rows2} = merger:out(LessFun, Rows),
%NewState = State#state{rows = Rows2, poped = [{Pid, Ref} | Poped]},
error_logger:error_msg("ABHI: pop_next row ~p~n", [{Pid, Ref, MinRow}]),
NewState = State#state{rows = Rows, poped = [{Pid, Ref} | Poped]},
{reply, {ok, MinRow}, NewState}
end;
% Allowed to be called after the first pop in a merge iteration.
handle_call(peek, _From, #state{poped = [_ | _], rows = Rows} = State) ->
case merger:size(Rows) of
0 ->
{reply, empty, State};
_ ->
{_, _, Row} = merger:min(Rows),
error_logger:error_msg("ABHI: peeked row: ~p~n", [Row]),
{reply, {ok, Row}, State}
end.
handle_cast({queue, Row, Pid, Ref}, #state{num_producers = N} = State) when N > 0 ->
#state{
rows = Rows,
consumer = Consumer,
poped = Poped
} = State,
error_logger:error_msg("ABHI: ~p ~p ~p ~p Inserting: ~p Poped: ~p~n",
[?FILE, ?MODULE, ?LINE, ?MACHINE, {Row, Pid, Ref}, Poped]),
Key = parse_row(Row),
case Key of
{row_count, _Count} ->
error_logger:error_msg("ABHI: ~p ~p ~p ~p Consumer: ~p Row: ~p~n",
[?FILE, ?MODULE, ?LINE, ?MACHINE, Consumer, Row]),
gen_server:reply(Consumer, {ok, Row}),
Consumer2 = nil,
Poped2 = [{Pid, Ref} | Poped],
NewState = State#state{
rows = Rows,
consumer = Consumer2,
poped = Poped2
},
{noreply, NewState};
_ ->
error_logger:error_msg("ABHI: Key to be inserted: ~p~n", [Key]),
ok = merger:in(Rows, Key, {Row, Pid, Ref}),
error_logger:error_msg("ABHI: ~p ~p ~p~n", [?FILE, ?MODULE, ?LINE]),
case (Consumer =/= nil) andalso (merger:size(Rows) >= N) of
true ->
{ok, {_MinKey, {MinRow, Pid2, Ref2}}} = merger:out(Rows),
error_logger:error_msg("ABHI: ~p ~p ~p ~p Consumer: ~p MinRow: ~p NumProducers: ~p~n",
[?FILE, ?MODULE, ?LINE, ?MACHINE, Consumer, MinRow, N]),
gen_server:reply(Consumer, {ok, MinRow}),
Poped2 = [{Pid2, Ref2} | Poped],
Consumer2 = nil;
false ->
Poped2 = Poped,
Consumer2 = Consumer
end,
NewState = State#state{
rows = Rows,
consumer = Consumer2,
poped = Poped2
},
{noreply, NewState}
end;
% Consumer should call this after doing its processing of the previously
% poped rows.
handle_cast(flush, #state{consumer = nil} = State) ->
#state{
poped = Poped,
num_producers = N,
rows = Rows
} = State,
lists:foreach(fun({Pid, Ref}) -> Pid ! {ok, Ref} end, Poped),
case (N =:= 0) andalso (merger:size(Rows) =:= 0) of
true ->
{stop, normal, State#state{poped = []}};
false ->
{noreply, State#state{poped = []}}
end;
handle_cast(done, #state{consumer = nil, num_producers = NumProds} = State) ->
NumProds2 = NumProds - 1,
case NumProds2 of
0 ->
case merger:size(State#state.rows) > 0 of
true ->
{noreply, State#state{num_producers = NumProds2}};
false ->
{stop, normal, State#state{num_producers = NumProds2}}
end;
_ ->
{noreply, State#state{num_producers = NumProds2}}
end;
handle_cast(done, #state{poped = []} = State) ->
#state{
rows = Rows,
num_producers = NumProds,
consumer = Consumer
} = State,
NumProds2 = NumProds - 1,
case NumProds2 of
0 ->
gen_server:reply(Consumer, closed),
{stop, normal, State};
_ ->
error_logger:error_msg("ABHI: ~p ~p ~p Size: ~p NumProds2: ~p~n",
[?FILE, ?MODULE, ?LINE, merger:size(Rows), NumProds2]),
case merger:size(Rows) < NumProds2 of
true ->
{noreply, State#state{num_producers = NumProds2}};
false ->
{ok, {_Key, {MinRow, Pid, Ref}}} = merger:out(Rows),
error_logger:error_msg("ABHI: ~p ~p ~p OutElem: ~p~n",
[?FILE, ?MODULE, ?LINE, MinRow]),
gen_server:reply(Consumer, {ok, MinRow}),
NewState = State#state{
num_producers = NumProds2,
consumer = nil,
rows = Rows,
poped = [{Pid, Ref}]
},
{noreply, NewState}
end
end.
handle_info(Msg, State) ->
{stop, {unexpected_msg, Msg}, State}.
terminate(_Reason, _State) ->
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
wait_previous_row_processed(Pid) ->
MRef = erlang:monitor(process, Pid),
CurRef = erlang:erase(queue_ref),
case is_reference(CurRef) of
true ->
receive
{ok, CurRef} ->
{ok, MRef};
{'DOWN', MRef, _, _, _} ->
closed
end;
false ->
{ok, MRef}
end.
parse_row(Row) ->
% error_logger:error_msg("ABHI: parse_row ~p~n", [Row]),
case Row of
{{{json, MRKey}, _DocID}, _JSONRow} ->
MRKey;
{row_count, _Count} ->
Row
end.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment