Skip to content

Instantly share code, notes, and snippets.

@fdmanana
Created June 3, 2011 18:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fdmanana/1006926 to your computer and use it in GitHub Desktop.
Save fdmanana/1006926 to your computer and use it in GitHub Desktop.
Multiple CouchDB mappers
diff --git a/src/couchdb/Makefile.am b/src/couchdb/Makefile.am
index f64a036..afc766c 100644
--- a/src/couchdb/Makefile.am
+++ b/src/couchdb/Makefile.am
@@ -88,6 +88,7 @@ source_files = \
couch_view_compactor.erl \
couch_view_updater.erl \
couch_view_group.erl \
+ couch_view_mapper.erl \
couch_db_updater.erl \
couch_work_queue.erl \
json_stream_parse.erl
@@ -157,6 +158,7 @@ compiled_files = \
couch_view_compactor.beam \
couch_view_updater.beam \
couch_view_group.beam \
+ couch_view_mapper.beam \
couch_db_updater.beam \
couch_work_queue.beam \
json_stream_parse.beam
diff --git a/src/couchdb/couch_view_mapper.erl b/src/couchdb/couch_view_mapper.erl
new file mode 100644
index 0000000..51e6a4b
--- /dev/null
+++ b/src/couchdb/couch_view_mapper.erl
@@ -0,0 +1,168 @@
+% Licensed under the Apache License, Version 2.0 (the "License"); you may not
+% use this file except in compliance with the License. You may obtain a copy of
+% the License at
+%
+% http://www.apache.org/licenses/LICENSE-2.0
+%
+% Unless required by applicable law or agreed to in writing, software
+% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+% License for the specific language governing permissions and limitations under
+% the License.
+
+-module(couch_view_mapper).
+
+% public API
+-export([start_link/3, stop/1, map/3, flush/1]).
+
+% gen_server API
+-export([init/1, handle_call/3, handle_info/2, handle_cast/2]).
+-export([code_change/3, terminate/2]).
+
+-include("couch_db.hrl").
+
+-record(state, {
+ procs = [],
+ busy_workers = [],
+ idle_workers = [],
+ todo = ordsets:new(),
+ flusher = nil
+}).
+
+
+start_link(Group, MapperCount, WriteQueue) ->
+ gen_server:start_link(?MODULE, {Group, MapperCount, WriteQueue}, []).
+
+
+stop(Pid) ->
+ ok = gen_server:cast(Pid, stop).
+
+
+map(Pid, Seq, Doc) ->
+ ok = gen_server:cast(Pid, {map, Seq, Doc}).
+
+
+flush(Pid) ->
+ ok = gen_server:call(Pid, flush, infinity).
+
+
+init({Group, MapperCount, WriteQueue}) ->
+ process_flag(trap_exit, true),
+ Parent = self(),
+ {Procs, Workers} = lists:unzip(lists:map(
+ fun(_) ->
+ {ok, P} = couch_query_servers:start_doc_map(
+ Group#group.def_lang,
+ [View#view.def || View <- Group#group.views],
+ Group#group.lib),
+ {P, spawn_link(fun() -> worker_loop(Parent, P, WriteQueue) end)}
+ end,
+ lists:seq(1, MapperCount))),
+ State = #state{
+ procs = Procs,
+ idle_workers = Workers
+ },
+ {ok, State}.
+
+
+handle_call(flush, _From, #state{todo = [], busy_workers = []} = State) ->
+ {reply, ok, State};
+
+handle_call(flush, From, State) ->
+ {noreply, State#state{flusher = From}}.
+
+
+handle_cast({map, Seq, Doc}, State) ->
+ #state{
+ idle_workers = IdleWorkers, busy_workers = BusyWorkers, todo = Todo
+ } = State,
+ case IdleWorkers of
+ [] ->
+ BusyWorkers2 = BusyWorkers,
+ IdleWorkers2 = IdleWorkers,
+ Todo2 = ordsets:add_element({Seq, Doc}, Todo);
+ [W | RestIdle] ->
+ W ! {map, Seq, Doc},
+ BusyWorkers2 = [W | BusyWorkers],
+ IdleWorkers2 = RestIdle,
+ Todo2 = Todo
+ end,
+ State2 = State#state{
+ busy_workers = BusyWorkers2,
+ idle_workers = IdleWorkers2,
+ todo = Todo2
+ },
+ {noreply, State2};
+
+handle_cast(stop, State) ->
+ {stop, normal, State}.
+
+
+handle_info({idle, From}, State) ->
+ case State#state.todo of
+ [] ->
+ IdleWorkers2 = [From | State#state.idle_workers],
+ BusyWorkers2 = State#state.busy_workers -- [From],
+ Todo2 = State#state.todo,
+ case BusyWorkers2 of
+ [] ->
+ Flusher2 = nil,
+ case State#state.flusher of
+ nil ->
+ ok;
+ _ ->
+ gen_server:reply(State#state.flusher, ok)
+ end;
+ _ ->
+ Flusher2 = State#state.flusher
+ end;
+ [{Seq2, Doc2} | Todo2] ->
+ From ! {map, Seq2, Doc2},
+ IdleWorkers2 = State#state.idle_workers,
+ BusyWorkers2 = State#state.busy_workers,
+ Flusher2 = State#state.flusher
+ end,
+ State2 = State#state{
+ todo = Todo2,
+ busy_workers = BusyWorkers2,
+ idle_workers = IdleWorkers2,
+ flusher = Flusher2
+ },
+ {noreply, State2}.
+
+
+terminate(_Reason, #state{procs = Procs}) ->
+ lists:foreach(fun couch_query_servers:stop_doc_map/1, Procs).
+
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+
+worker_loop(Parent, Proc, WriteQueue) ->
+ receive
+ {map, Seq, #doc{id = Id, deleted = true}} ->
+ Item = {Seq, Id, []},
+ couch_work_queue:queue(WriteQueue, Item, item_size(Item)),
+ Parent ! {idle, self()},
+ worker_loop(Parent, Proc, WriteQueue);
+ {map, Seq, #doc{id = Id} = Doc} ->
+ {ok, Result} = couch_query_servers:map_doc_raw(Proc, Doc),
+ Item = {Seq, Id, Result},
+ couch_work_queue:queue(WriteQueue, Item, item_size(Item)),
+ Parent ! {idle, self()},
+ worker_loop(Parent, Proc, WriteQueue)
+ end.
+
+
+item_size({_Seq, Id, {json, Json}}) ->
+ byte_size(Id) + byte_size(Json);
+item_size({_Seq, Id, []}) ->
+ byte_size(Id);
+item_size({_Seq, Id, EJson}) ->
+ byte_size(Id) +
+ try
+ erlang:external_size(EJson)
+ catch _:_ ->
+ byte_size(?term_to_bin(EJson))
+ end.
diff --git a/src/couchdb/couch_view_updater.erl b/src/couchdb/couch_view_updater.erl
index f5fefc8..27cc808 100644
--- a/src/couchdb/couch_view_updater.erl
+++ b/src/couchdb/couch_view_updater.erl
@@ -16,6 +16,9 @@
-include("couch_db.hrl").
+% TODO: make this .ini configurable
+-define(NUM_MAPPERS, 4).
+
-spec update(_, #group{}) -> no_return().
update(Owner, Group) ->
@@ -43,10 +46,14 @@ update(Owner, Group) ->
{ok, WriteQueue} = couch_work_queue:new(
[{max_size, 100000}, {max_items, 500}]),
Self = self(),
- ViewEmptyKVs = [{View, []} || View <- Group2#group.views],
spawn_link(fun() ->
- do_maps(add_query_server(Group2), MapQueue, WriteQueue)
+ {ok, Mapper} = couch_view_mapper:start_link(
+ Group2, ?NUM_MAPPERS, WriteQueue),
+ do_maps(MapQueue, Mapper),
+ ok = couch_view_mapper:stop(Mapper),
+ ok = couch_work_queue:close(WriteQueue)
end),
+ ViewEmptyKVs = [{View, []} || View <- Group2#group.views],
spawn_link(fun() ->
do_writes(Self, Owner, Group2, WriteQueue, Seq == 0, ViewEmptyKVs)
end),
@@ -82,17 +89,6 @@ update(Owner, Group) ->
NewGroup#group{current_seq=couch_db:get_update_seq(Db)}})
end.
-
-add_query_server(#group{query_server = nil} = Group) ->
- {ok, Qs} = couch_query_servers:start_doc_map(
- Group#group.def_lang,
- [View#view.def || View <- Group#group.views],
- Group#group.lib),
- Group#group{query_server = Qs};
-add_query_server(Group) ->
- Group.
-
-
purge_index(#group{db=Db, views=Views, id_btree=IdBtree}=Group) ->
{ok, PurgedIdsRevs} = couch_db:get_last_purged(Db),
Ids = [Id || {Id, _Revs} <- PurgedIdsRevs],
@@ -145,35 +141,16 @@ load_doc(Db, DocInfo, MapQueue, DocOpts, IncludeDesign) ->
end
end.
-do_maps(#group{query_server = Qs} = Group, MapQueue, WriteQueue) ->
+do_maps(MapQueue, Mapper) ->
case couch_work_queue:dequeue(MapQueue) of
closed ->
- couch_work_queue:close(WriteQueue),
- couch_query_servers:stop_doc_map(Group#group.query_server);
+ ok;
{ok, Queue} ->
lists:foreach(
- fun({Seq, #doc{id = Id, deleted = true}}) ->
- Item = {Seq, Id, []},
- ok = couch_work_queue:queue(WriteQueue, Item, item_size(Item));
- ({Seq, #doc{id = Id, deleted = false} = Doc}) ->
- {ok, Result} = couch_query_servers:map_doc_raw(Qs, Doc),
- Item = {Seq, Id, Result},
- ok = couch_work_queue:queue(WriteQueue, Item, item_size(Item))
- end,
+ fun({Seq, Doc}) -> ok = couch_view_mapper:map(Mapper, Seq, Doc) end,
Queue),
- do_maps(Group, MapQueue, WriteQueue)
- end.
-
-item_size({_Seq, Id, []}) ->
- byte_size(Id);
-item_size({_Seq, Id, {json, Json}}) ->
- byte_size(Id) + byte_size(Json);
-item_size({_Seq, Id, EJson}) ->
- byte_size(Id) +
- try
- erlang:external_size(EJson)
- catch _:_ ->
- byte_size(?term_to_bin(EJson))
+ ok = couch_view_mapper:flush(Mapper),
+ do_maps(MapQueue, Mapper)
end.
do_writes(Parent, Owner, Group, WriteQueue, InitialBuild, ViewEmptyKVs) ->
@@ -181,20 +158,21 @@ do_writes(Parent, Owner, Group, WriteQueue, InitialBuild, ViewEmptyKVs) ->
closed ->
Parent ! {new_group, Group};
{ok, Queue} ->
- {ViewKVs, DocIdViewIdKeys} = lists:foldr(
- fun({_Seq, Id, []}, {ViewKVsAcc, DocIdViewIdKeysAcc}) ->
- {ViewKVsAcc, [{Id, []} | DocIdViewIdKeysAcc]};
- ({_Seq, Id, RawQueryResults}, {ViewKVsAcc, DocIdViewIdKeysAcc}) ->
+ {ViewKVs, DocIdViewIdKeys, NewSeq} = lists:foldr(
+ fun({Seq, Id, []}, {ViewKVsAcc, DocIdViewIdKeysAcc, MaxSeq}) ->
+ MaxSeq2 = erlang:max(Seq, MaxSeq),
+ {ViewKVsAcc, [{Id, []} | DocIdViewIdKeysAcc], MaxSeq2};
+ ({Seq, Id, RawQueryResults}, {ViewKVsAcc, DocIdViewIdKeysAcc, MaxSeq}) ->
QueryResults = [
[list_to_tuple(FunResult) || FunResult <- FunRs] || FunRs <-
couch_query_servers:raw_to_ejson(RawQueryResults)
],
{NewViewKVs, NewViewIdKeys} = view_insert_doc_query_results(
Id, QueryResults, ViewKVsAcc, [], []),
- {NewViewKVs, [{Id, NewViewIdKeys} | DocIdViewIdKeysAcc]}
+ MaxSeq2 = erlang:max(Seq, MaxSeq),
+ {NewViewKVs, [{Id, NewViewIdKeys} | DocIdViewIdKeysAcc], MaxSeq2}
end,
- {ViewEmptyKVs, []}, Queue),
- {NewSeq, _, _} = lists:last(Queue),
+ {ViewEmptyKVs, [], 0}, Queue),
Group2 = write_changes(
Group, ViewKVs, DocIdViewIdKeys, NewSeq, InitialBuild),
case Owner of
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment