Skip to content

Instantly share code, notes, and snippets.

@tilgovi
Created August 31, 2010 20:54
Show Gist options
  • Save tilgovi/559737 to your computer and use it in GitHub Desktop.
Save tilgovi/559737 to your computer and use it in GitHub Desktop.
diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl
index 7678f6c..df7c75b 100644
--- a/src/couchdb/couch_db.erl
+++ b/src/couchdb/couch_db.erl
@@ -645,7 +645,7 @@ update_docs(Db, Docs, Options, replicated_changes) ->
DocErrors = [],
DocBuckets3 = DocBuckets
end,
- DocBuckets4 = [[doc_flush_atts(check_dup_atts(Doc), Db#db.fd)
+ DocBuckets4 = [[check_dup_atts(Doc)
|| Doc <- Bucket] || Bucket <- DocBuckets3],
{ok, []} = write_and_commit(Db, DocBuckets4, [], [merge_conflicts | Options]),
{ok, DocErrors};
@@ -700,8 +700,8 @@ update_docs(Db, Docs, Options, interactive_edit) ->
Options2 = if AllOrNothing -> [merge_conflicts];
true -> [] end ++ Options,
DocBuckets3 = [[
- doc_flush_atts(set_new_att_revpos(
- check_dup_atts(Doc)), Db#db.fd)
+ set_new_att_revpos(
+ check_dup_atts(Doc))
|| Doc <- B] || B <- DocBuckets2],
{DocBuckets4, IdRevs} = new_revs(DocBuckets3, [], []),
@@ -755,21 +755,22 @@ collect_results(UpdatePid, MRef, ResultsAcc) ->
exit(Reason)
end.
-write_and_commit(#db{update_pid=UpdatePid, user_ctx=Ctx}=Db, DocBuckets,
+write_and_commit(#db{update_pid=UpdatePid, fd=Fd, user_ctx=Ctx}=Db, DocBuckets0,
NonRepDocs, Options0) ->
Options = set_commit_option(Options0),
MergeConflicts = lists:member(merge_conflicts, Options),
FullCommit = lists:member(full_commit, Options),
MRef = erlang:monitor(process, UpdatePid),
+ DocBuckets1 = [[doc_flush_prep(Doc, Fd) || Doc <- Bucket] || Bucket <- DocBuckets0],
try
- UpdatePid ! {update_docs, self(), DocBuckets, NonRepDocs, MergeConflicts, FullCommit},
+ UpdatePid ! {update_docs, self(), DocBuckets1, NonRepDocs, MergeConflicts, FullCommit},
case collect_results(UpdatePid, MRef, []) of
{ok, Results} -> {ok, Results};
retry ->
% This can happen if the db file we wrote to was swapped out by
% compaction. Retry by reopening the db and writing to the current file
{ok, Db2} = open_ref_counted(Db#db.main_pid, Ctx),
- DocBuckets2 = [[doc_flush_atts(Doc, Db2#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets],
+ DocBuckets2 = [[doc_flush_prep(Doc, Db2#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets0],
% We only retry once
close(Db2),
UpdatePid ! {update_docs, self(), DocBuckets2, NonRepDocs, MergeConflicts, FullCommit},
@@ -790,7 +791,20 @@ set_new_att_revpos(#doc{revs={RevPos,_Revs},atts=Atts}=Doc) ->
(Att) ->
Att#att{revpos=RevPos+1}
end, Atts)}.
-
+
+doc_flush_prep(Doc, BinFd) ->
+ Doc2 = doc_flush_atts(Doc, BinFd),
+ DiskAtts =
+ case Doc2#doc.atts of
+ [] -> [];
+ Atts ->
+ [{N,T,P,AL,DL,R,M,E}
+ || #att{name=N,type=T,data={_,P},md5=M,revpos=R,
+ att_len=AL,disk_len=DL,encoding=E}
+ <- Atts]
+ end,
+ DiskBin = term_to_binary({Doc2#doc.body, DiskAtts}),
+ Doc#doc{atts=DiskAtts, body={BinFd, DiskBin}}.
doc_flush_atts(Doc, Fd) ->
Doc#doc{atts=[flush_att(Fd, Att) || Att <- Doc#doc.atts]}.
diff --git a/src/couchdb/couch_db_updater.erl b/src/couchdb/couch_db_updater.erl
index 19a4c16..38f6e51 100644
--- a/src/couchdb/couch_db_updater.erl
+++ b/src/couchdb/couch_db_updater.erl
@@ -439,49 +439,70 @@ refresh_validate_doc_funs(Db) ->
% rev tree functions
-flush_trees(_Db, [], AccFlushedTrees) ->
- {ok, lists:reverse(AccFlushedTrees)};
-flush_trees(#db{fd=Fd,header=Header}=Db,
- [InfoUnflushed | RestUnflushed], AccFlushed) ->
- #full_doc_info{update_seq=UpdateSeq, rev_tree=Unflushed} = InfoUnflushed,
- Flushed = couch_key_tree:map(
- fun(_Rev, Value) ->
+flush_result({FlushPid, FlushRef}) ->
+ receive
+ {flush_result, FlushPid, {ok, NewSummaryPointer}} ->
+ ?LOG_DEBUG("Updater requesting flush of ~p", [FlushRef]),
+ erlang:demonitor(FlushRef, [flush]),
+ NewSummaryPointer;
+ {'DOWN', _, _, _, normal} ->
+ flush_result({FlushPid, FlushRef});
+ {'DOWN', _, _, _, Reason} ->
+ exit(Reason)
+ end.
+
+flush_trees(_Db, [], AccDocInfo, []) ->
+ {ok, lists:reverse(AccDocInfo)};
+flush_trees(_Db, [], AccDocInfo, AccFlushRefs) ->
+ {Flushed, []} =
+ lists:foldl(
+ fun(#full_doc_info{update_seq=UpdateSeq, rev_tree=RevTree}=DocInfo,
+ {Acc, FlushRefs}) ->
+ {FlushedTree, FlushRefs2} =
+ couch_key_tree:mapfoldl(
+ fun(_Rev, Value, FlushRefs3) ->
+ case Value of
+ #doc{deleted=IsDeleted} ->
+ [Ref|Rest] = FlushRefs3,
+ {{IsDeleted, flush_result(Ref), UpdateSeq}, Rest};
+ _ ->
+ {Value, FlushRefs3}
+ end
+ end, FlushRefs, RevTree),
+ {[DocInfo#full_doc_info{rev_tree=FlushedTree}|Acc], FlushRefs2}
+ end, {[], AccFlushRefs}, AccDocInfo),
+ {ok, Flushed};
+flush_trees(#db{fd=Fd,header=Header}=Db, [InfoUnflushed | RestUnflushed],
+ AccDocInfo, AccFlushRefs) ->
+ #full_doc_info{rev_tree=RevTree} = InfoUnflushed,
+ AccFlushRefs2 = couch_key_tree:foldl(
+ fun(_Rev, Value, Acc) ->
case Value of
- #doc{atts=Atts,deleted=IsDeleted}=Doc ->
+ #doc{body={BinFd, _, _}} when BinFd /= Fd ->
+ % BinFd must not equal our Fd. This can happen when a database
+ % is being switched out during a compaction
+ ?LOG_DEBUG("File where the attachments are written has"
+ " changed. Possibly retrying.", []),
+ throw(retry);
+ #doc{body={Fd, BodyBin}} ->
% this node value is actually an unwritten document summary,
% write to disk.
- % make sure the Fd in the written bins is the same Fd we are
- % and convert bins, removing the FD.
- % All bins should have been written to disk already.
- DiskAtts =
- case Atts of
- [] -> [];
- [#att{data={BinFd, _Sp}} | _ ] when BinFd == Fd ->
- [{N,T,P,AL,DL,R,M,E}
- || #att{name=N,type=T,data={_,P},md5=M,revpos=R,
- att_len=AL,disk_len=DL,encoding=E}
- <- Atts];
- _ ->
- % BinFd must not equal our Fd. This can happen when a database
- % is being switched out during a compaction
- ?LOG_DEBUG("File where the attachments are written has"
- " changed. Possibly retrying.", []),
- throw(retry)
- end,
- {ok, NewSummaryPointer} =
- case Header#db_header.disk_version < 4 of
- true ->
- couch_file:append_term(Fd, {Doc#doc.body, DiskAtts});
- false ->
- couch_file:append_term_md5(Fd, {Doc#doc.body, DiskAtts})
- end,
- {IsDeleted, NewSummaryPointer, UpdateSeq};
+ Parent = self(),
+ {FlushPid, FlushRef} = spawn_monitor(fun() ->
+ {ok, NewSummaryPointer} =
+ case Header#db_header.disk_version < 4 of
+ true ->
+ couch_file:append_binary(Fd, BodyBin);
+ false ->
+ couch_file:append_binary_md5(Fd, BodyBin)
+ end,
+ Parent ! {flush_result, self(), {ok, NewSummaryPointer}} end),
+ [{FlushPid, FlushRef}|Acc];
_ ->
- Value
+ Acc
end
- end, Unflushed),
- flush_trees(Db, RestUnflushed, [InfoUnflushed#full_doc_info{rev_tree=Flushed} | AccFlushed]).
-
+ end, AccFlushRefs, RevTree),
+ flush_trees(Db, RestUnflushed, [InfoUnflushed | AccDocInfo], AccFlushRefs2).
send_result(Client, Id, OriginalRevs, NewResult) ->
% used to send a result to the client
@@ -605,7 +626,7 @@ update_docs_int(Db, DocsList, NonRepDocs, MergeConflicts, FullCommit) ->
% Write out the document summaries (the bodies are stored in the nodes of
% the trees, the attachments are already written to disk)
- {ok, FlushedFullDocInfos} = flush_trees(Db2, NewFullDocInfos, []),
+ {ok, FlushedFullDocInfos} = flush_trees(Db2, NewFullDocInfos, [], []),
{IndexFullDocInfos, IndexDocInfos} =
new_index_entries(FlushedFullDocInfos, [], []),
diff --git a/src/couchdb/couch_key_tree.erl b/src/couchdb/couch_key_tree.erl
index 4fe09bf..ad4d092 100644
--- a/src/couchdb/couch_key_tree.erl
+++ b/src/couchdb/couch_key_tree.erl
@@ -13,7 +13,7 @@
-module(couch_key_tree).
-export([merge/2, find_missing/2, get_key_leafs/2, get_full_key_paths/2, get/2]).
--export([map/2, get_all_leafs/1, count_leafs/1, remove_leafs/2,
+-export([map/2, foldl/3, mapfoldl/3, get_all_leafs/1, count_leafs/1, remove_leafs/2,
get_all_leafs_full/1,stem/2,map_leafs/2]).
% a key tree looks like this:
@@ -294,6 +294,33 @@ map_simple(Fun, Pos, [{Key, Value, SubTree} | RestTree]) ->
if SubTree == [] -> leaf; true -> branch end),
[{Key, Value2, map_simple(Fun, Pos + 1, SubTree)} | map_simple(Fun, Pos, RestTree)].
+foldl(_Fun, Acc, []) ->
+ Acc;
+foldl(Fun, Acc, [{Pos, Tree}|Rest]) ->
+ Acc2 = foldl_simple(Fun, Acc, Pos, [Tree]),
+ foldl(Fun, Acc2, Rest).
+
+foldl_simple(_Fun, Acc, _Pos, []) ->
+ Acc;
+foldl_simple(Fun, Acc, Pos, [{Key, Value, SubTree} | RestTree]) ->
+ Acc2 = Fun({Pos, Key}, Value, Acc),
+ Acc3 = foldl_simple(Fun, Acc2, Pos + 1, SubTree),
+ foldl_simple(Fun, Acc3, Pos, RestTree).
+
+mapfoldl(_Fun, Acc, []) ->
+ {[], Acc};
+mapfoldl(Fun, Acc, [{Pos, Tree}|Rest]) ->
+ {[NewTree], Acc2} = mapfoldl_simple(Fun, Acc, Pos, [Tree]),
+ {NewRest, Acc3} = mapfoldl(Fun, Acc2, Rest),
+ {[{Pos, NewTree} | NewRest], Acc3}.
+
+mapfoldl_simple(_Fun, Acc, _Pos, []) ->
+ {[], Acc};
+mapfoldl_simple(Fun, Acc, Pos, [{Key, Value, Subtree} | RestTree ]) ->
+ {Value2, Acc2} = Fun({Pos, Key}, Value, Acc),
+ {NewSubTree, Acc3} = mapfoldl_simple(Fun, Acc2, Pos, Subtree),
+ {NewRestTree, Acc4} = mapfoldl_simple(Fun, Acc3, Pos, RestTree),
+ {[{Key, Value2, NewSubTree} | NewRestTree], Acc4}.
map_leafs(_Fun, []) ->
[];
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment