Created
August 31, 2010 20:54
-
-
Save tilgovi/559737 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/src/couchdb/couch_db.erl b/src/couchdb/couch_db.erl | |
index 7678f6c..df7c75b 100644 | |
--- a/src/couchdb/couch_db.erl | |
+++ b/src/couchdb/couch_db.erl | |
@@ -645,7 +645,7 @@ update_docs(Db, Docs, Options, replicated_changes) -> | |
DocErrors = [], | |
DocBuckets3 = DocBuckets | |
end, | |
- DocBuckets4 = [[doc_flush_atts(check_dup_atts(Doc), Db#db.fd) | |
+ DocBuckets4 = [[check_dup_atts(Doc) | |
|| Doc <- Bucket] || Bucket <- DocBuckets3], | |
{ok, []} = write_and_commit(Db, DocBuckets4, [], [merge_conflicts | Options]), | |
{ok, DocErrors}; | |
@@ -700,8 +700,8 @@ update_docs(Db, Docs, Options, interactive_edit) -> | |
Options2 = if AllOrNothing -> [merge_conflicts]; | |
true -> [] end ++ Options, | |
DocBuckets3 = [[ | |
- doc_flush_atts(set_new_att_revpos( | |
- check_dup_atts(Doc)), Db#db.fd) | |
+ set_new_att_revpos( | |
+ check_dup_atts(Doc)) | |
|| Doc <- B] || B <- DocBuckets2], | |
{DocBuckets4, IdRevs} = new_revs(DocBuckets3, [], []), | |
@@ -755,21 +755,22 @@ collect_results(UpdatePid, MRef, ResultsAcc) -> | |
exit(Reason) | |
end. | |
-write_and_commit(#db{update_pid=UpdatePid, user_ctx=Ctx}=Db, DocBuckets, | |
+write_and_commit(#db{update_pid=UpdatePid, fd=Fd, user_ctx=Ctx}=Db, DocBuckets0, | |
NonRepDocs, Options0) -> | |
Options = set_commit_option(Options0), | |
MergeConflicts = lists:member(merge_conflicts, Options), | |
FullCommit = lists:member(full_commit, Options), | |
MRef = erlang:monitor(process, UpdatePid), | |
+ DocBuckets1 = [[doc_flush_prep(Doc, Fd) || Doc <- Bucket] || Bucket <- DocBuckets0], | |
try | |
- UpdatePid ! {update_docs, self(), DocBuckets, NonRepDocs, MergeConflicts, FullCommit}, | |
+ UpdatePid ! {update_docs, self(), DocBuckets1, NonRepDocs, MergeConflicts, FullCommit}, | |
case collect_results(UpdatePid, MRef, []) of | |
{ok, Results} -> {ok, Results}; | |
retry -> | |
% This can happen if the db file we wrote to was swapped out by | |
% compaction. Retry by reopening the db and writing to the current file | |
{ok, Db2} = open_ref_counted(Db#db.main_pid, Ctx), | |
- DocBuckets2 = [[doc_flush_atts(Doc, Db2#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets], | |
+ DocBuckets2 = [[doc_flush_prep(Doc, Db2#db.fd) || Doc <- Bucket] || Bucket <- DocBuckets0], | |
% We only retry once | |
close(Db2), | |
UpdatePid ! {update_docs, self(), DocBuckets2, NonRepDocs, MergeConflicts, FullCommit}, | |
@@ -790,7 +791,20 @@ set_new_att_revpos(#doc{revs={RevPos,_Revs},atts=Atts}=Doc) -> | |
(Att) -> | |
Att#att{revpos=RevPos+1} | |
end, Atts)}. | |
- | |
+ | |
+doc_flush_prep(Doc, BinFd) -> | |
+ Doc2 = doc_flush_atts(Doc, BinFd), | |
+ DiskAtts = | |
+ case Doc2#doc.atts of | |
+ [] -> []; | |
+ Atts -> | |
+ [{N,T,P,AL,DL,R,M,E} | |
+ || #att{name=N,type=T,data={_,P},md5=M,revpos=R, | |
+ att_len=AL,disk_len=DL,encoding=E} | |
+ <- Atts] | |
+ end, | |
+ DiskBin = term_to_binary({Doc2#doc.body, DiskAtts}), | |
+ Doc#doc{atts=DiskAtts, body={BinFd, DiskBin}}. | |
doc_flush_atts(Doc, Fd) -> | |
Doc#doc{atts=[flush_att(Fd, Att) || Att <- Doc#doc.atts]}. | |
diff --git a/src/couchdb/couch_db_updater.erl b/src/couchdb/couch_db_updater.erl | |
index 19a4c16..38f6e51 100644 | |
--- a/src/couchdb/couch_db_updater.erl | |
+++ b/src/couchdb/couch_db_updater.erl | |
@@ -439,49 +439,70 @@ refresh_validate_doc_funs(Db) -> | |
% rev tree functions | |
-flush_trees(_Db, [], AccFlushedTrees) -> | |
- {ok, lists:reverse(AccFlushedTrees)}; | |
-flush_trees(#db{fd=Fd,header=Header}=Db, | |
- [InfoUnflushed | RestUnflushed], AccFlushed) -> | |
- #full_doc_info{update_seq=UpdateSeq, rev_tree=Unflushed} = InfoUnflushed, | |
- Flushed = couch_key_tree:map( | |
- fun(_Rev, Value) -> | |
+flush_result({FlushPid, FlushRef}) -> | |
+ receive | |
+ {flush_result, FlushPid, {ok, NewSummaryPointer}} -> | |
+ ?LOG_DEBUG("Updater requesting flush of ~p", [FlushRef]), | |
+ erlang:demonitor(FlushRef, [flush]), | |
+ NewSummaryPointer; | |
+ {'DOWN', _, _, _, normal} -> | |
+ flush_result({FlushPid, FlushRef}); | |
+ {'DOWN', _, _, _, Reason} -> | |
+ exit(Reason) | |
+ end. | |
+ | |
+flush_trees(_Db, [], AccDocInfo, []) -> | |
+ {ok, lists:reverse(AccDocInfo)}; | |
+flush_trees(_Db, [], AccDocInfo, AccFlushRefs) -> | |
+ {Flushed, []} = | |
+ lists:foldl( | |
+ fun(#full_doc_info{update_seq=UpdateSeq, rev_tree=RevTree}=DocInfo, | |
+ {Acc, FlushRefs}) -> | |
+ {FlushedTree, FlushRefs2} = | |
+ couch_key_tree:mapfoldl( | |
+ fun(_Rev, Value, FlushRefs3) -> | |
+ case Value of | |
+ #doc{deleted=IsDeleted} -> | |
+ [Ref|Rest] = FlushRefs3, | |
+ {{IsDeleted, flush_result(Ref), UpdateSeq}, Rest}; | |
+ _ -> | |
+ {Value, FlushRefs3} | |
+ end | |
+ end, FlushRefs, RevTree), | |
+ {[DocInfo#full_doc_info{rev_tree=FlushedTree}|Acc], FlushRefs2} | |
+ end, {[], AccFlushRefs}, AccDocInfo), | |
+ {ok, Flushed}; | |
+flush_trees(#db{fd=Fd,header=Header}=Db, [InfoUnflushed | RestUnflushed], | |
+ AccDocInfo, AccFlushRefs) -> | |
+ #full_doc_info{rev_tree=RevTree} = InfoUnflushed, | |
+ AccFlushRefs2 = couch_key_tree:foldl( | |
+ fun(_Rev, Value, Acc) -> | |
case Value of | |
- #doc{atts=Atts,deleted=IsDeleted}=Doc -> | |
+ #doc{body={BinFd, _, _}} when BinFd /= Fd -> | |
+ % BinFd must not equal our Fd. This can happen when a database | |
+ % is being switched out during a compaction | |
+ ?LOG_DEBUG("File where the attachments are written has" | |
+ " changed. Possibly retrying.", []), | |
+ throw(retry); | |
+ #doc{body={Fd, BodyBin}} -> | |
% this node value is actually an unwritten document summary, | |
% write to disk. | |
- % make sure the Fd in the written bins is the same Fd we are | |
- % and convert bins, removing the FD. | |
- % All bins should have been written to disk already. | |
- DiskAtts = | |
- case Atts of | |
- [] -> []; | |
- [#att{data={BinFd, _Sp}} | _ ] when BinFd == Fd -> | |
- [{N,T,P,AL,DL,R,M,E} | |
- || #att{name=N,type=T,data={_,P},md5=M,revpos=R, | |
- att_len=AL,disk_len=DL,encoding=E} | |
- <- Atts]; | |
- _ -> | |
- % BinFd must not equal our Fd. This can happen when a database | |
- % is being switched out during a compaction | |
- ?LOG_DEBUG("File where the attachments are written has" | |
- " changed. Possibly retrying.", []), | |
- throw(retry) | |
- end, | |
- {ok, NewSummaryPointer} = | |
- case Header#db_header.disk_version < 4 of | |
- true -> | |
- couch_file:append_term(Fd, {Doc#doc.body, DiskAtts}); | |
- false -> | |
- couch_file:append_term_md5(Fd, {Doc#doc.body, DiskAtts}) | |
- end, | |
- {IsDeleted, NewSummaryPointer, UpdateSeq}; | |
+ Parent = self(), | |
+ {FlushPid, FlushRef} = spawn_monitor(fun() -> | |
+ {ok, NewSummaryPointer} = | |
+ case Header#db_header.disk_version < 4 of | |
+ true -> | |
+ couch_file:append_binary(Fd, BodyBin); | |
+ false -> | |
+ couch_file:append_binary_md5(Fd, BodyBin) | |
+ end, | |
+ Parent ! {flush_result, self(), {ok, NewSummaryPointer}} end), | |
+ [{FlushPid, FlushRef}|Acc]; | |
_ -> | |
- Value | |
+ Acc | |
end | |
- end, Unflushed), | |
- flush_trees(Db, RestUnflushed, [InfoUnflushed#full_doc_info{rev_tree=Flushed} | AccFlushed]). | |
- | |
+ end, AccFlushRefs, RevTree), | |
+ flush_trees(Db, RestUnflushed, [InfoUnflushed | AccDocInfo], AccFlushRefs2). | |
send_result(Client, Id, OriginalRevs, NewResult) -> | |
% used to send a result to the client | |
@@ -605,7 +626,7 @@ update_docs_int(Db, DocsList, NonRepDocs, MergeConflicts, FullCommit) -> | |
% Write out the document summaries (the bodies are stored in the nodes of | |
% the trees, the attachments are already written to disk) | |
- {ok, FlushedFullDocInfos} = flush_trees(Db2, NewFullDocInfos, []), | |
+ {ok, FlushedFullDocInfos} = flush_trees(Db2, NewFullDocInfos, [], []), | |
{IndexFullDocInfos, IndexDocInfos} = | |
new_index_entries(FlushedFullDocInfos, [], []), | |
diff --git a/src/couchdb/couch_key_tree.erl b/src/couchdb/couch_key_tree.erl | |
index 4fe09bf..ad4d092 100644 | |
--- a/src/couchdb/couch_key_tree.erl | |
+++ b/src/couchdb/couch_key_tree.erl | |
@@ -13,7 +13,7 @@ | |
-module(couch_key_tree). | |
-export([merge/2, find_missing/2, get_key_leafs/2, get_full_key_paths/2, get/2]). | |
--export([map/2, get_all_leafs/1, count_leafs/1, remove_leafs/2, | |
+-export([map/2, foldl/3, mapfoldl/3, get_all_leafs/1, count_leafs/1, remove_leafs/2, | |
get_all_leafs_full/1,stem/2,map_leafs/2]). | |
% a key tree looks like this: | |
@@ -294,6 +294,33 @@ map_simple(Fun, Pos, [{Key, Value, SubTree} | RestTree]) -> | |
if SubTree == [] -> leaf; true -> branch end), | |
[{Key, Value2, map_simple(Fun, Pos + 1, SubTree)} | map_simple(Fun, Pos, RestTree)]. | |
+foldl(_Fun, Acc, []) -> | |
+ Acc; | |
+foldl(Fun, Acc, [{Pos, Tree}|Rest]) -> | |
+ Acc2 = foldl_simple(Fun, Acc, Pos, [Tree]), | |
+ foldl(Fun, Acc2, Rest). | |
+ | |
+foldl_simple(_Fun, Acc, _Pos, []) -> | |
+ Acc; | |
+foldl_simple(Fun, Acc, Pos, [{Key, Value, SubTree} | RestTree]) -> | |
+ Acc2 = Fun({Pos, Key}, Value, Acc), | |
+ Acc3 = foldl_simple(Fun, Acc2, Pos + 1, SubTree), | |
+ foldl_simple(Fun, Acc3, Pos, RestTree). | |
+ | |
+mapfoldl(_Fun, Acc, []) -> | |
+ {[], Acc}; | |
+mapfoldl(Fun, Acc, [{Pos, Tree}|Rest]) -> | |
+ {[NewTree], Acc2} = mapfoldl_simple(Fun, Acc, Pos, [Tree]), | |
+ {NewRest, Acc3} = mapfoldl(Fun, Acc2, Rest), | |
+ {[{Pos, NewTree} | NewRest], Acc3}. | |
+ | |
+mapfoldl_simple(_Fun, Acc, _Pos, []) -> | |
+ {[], Acc}; | |
+mapfoldl_simple(Fun, Acc, Pos, [{Key, Value, Subtree} | RestTree ]) -> | |
+ {Value2, Acc2} = Fun({Pos, Key}, Value, Acc), | |
+ {NewSubTree, Acc3} = mapfoldl_simple(Fun, Acc2, Pos, Subtree), | |
+ {NewRestTree, Acc4} = mapfoldl_simple(Fun, Acc3, Pos, RestTree), | |
+ {[{Key, Value2, NewSubTree} | NewRestTree], Acc4}. | |
map_leafs(_Fun, []) -> | |
[]; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment