Last active
January 19, 2018 17:01
-
-
Save davisp/27cd7ab54cdffeaa6e96590df4f988f9 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/src/couch/src/couch_httpd_multipart.erl b/src/couch/src/couch_httpd_multipart.erl | |
index 6ce3c76fe..f38cef8b5 100644 | |
--- a/src/couch/src/couch_httpd_multipart.erl | |
+++ b/src/couch/src/couch_httpd_multipart.erl | |
@@ -99,12 +99,24 @@ mp_parse_atts(eof, {Ref, Chunks, Offset, Counters, Waiting}) -> | |
abort_parsing -> | |
ok; | |
{get_bytes, Ref, From} -> | |
- C2 = orddict:update_counter(From, 1, Counters), | |
- NewAcc = maybe_send_data({Ref, Chunks, Offset, C2, [From|Waiting]}), | |
- mp_parse_atts(eof, NewAcc); | |
+ C2 = update_writer(From, Counters), | |
+ case maybe_send_data({Ref, Chunks, Offset, C2, [From|Waiting]}) of | |
+ abort_parsing -> | |
+ ok; | |
+ NewAcc -> | |
+ mp_parse_atts(eof, NewAcc) | |
+ end; | |
{'DOWN', ParentRef, _, _, _} -> | |
- exit(mp_reader_coordinator_died) | |
- after 3600000 -> | |
+ exit(mp_reader_coordinator_died); | |
+ {'DOWN', WriterRef, _, WriterPid, _} -> | |
+ case remove_writer(WriterPid, WriterRef, Counters) of | |
+ abort_parsing -> | |
+ ok; | |
+ C2 -> | |
+ NewAcc = {Ref, Chunks, Offset, C2, Waiting -- [WriterPid]}, | |
+ mp_parse_atts(eof, NewAcc) | |
+ end | |
+ after 300000 -> | |
ok | |
end | |
end. | |
@@ -116,12 +128,12 @@ mp_abort_parse_atts(_, _) -> | |
maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}) -> | |
receive {get_bytes, Ref, From} -> | |
- NewCounters = orddict:update_counter(From, 1, Counters), | |
+ NewCounters = update_writer(From, Counters), | |
maybe_send_data({Ref, Chunks, Offset, NewCounters, [From|Waiting]}) | |
after 0 -> | |
% reply to as many writers as possible | |
NewWaiting = lists:filter(fun(Writer) -> | |
- WhichChunk = orddict:fetch(Writer, Counters), | |
+ {_, WhichChunk} = orddict:fetch(Writer, Counters), | |
ListIndex = WhichChunk - Offset, | |
if ListIndex =< length(Chunks) -> | |
Writer ! {bytes, Ref, lists:nth(ListIndex, Chunks)}, | |
@@ -160,14 +172,55 @@ maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}) -> | |
abort_parsing; | |
{'DOWN', ParentRef, _, _, _} -> | |
exit(mp_reader_coordinator_died); | |
+ {'DOWN', WriterRef, _, WriterPid, _} -> | |
+ case remove_writer(WriterPid, WriterRef, Counters) of | |
+ abort_parsing -> | |
+ abort_parsing; | |
+ C2 -> | |
+ RestWaiting = NewWaiting -- [WriterPid], | |
+ NewAcc = {Ref, NewChunks, NewOffset, C2, RestWaiting}, | |
+ maybe_send_data(NewAcc) | |
+ end; | |
{get_bytes, Ref, X} -> | |
- C2 = orddict:update_counter(X, 1, Counters), | |
+ C2 = update_writer(X, Counters), | |
maybe_send_data({Ref, NewChunks, NewOffset, C2, [X|NewWaiting]}) | |
+ after 300000 -> | |
+ abort_parsing | |
end | |
end | |
end. | |
+update_writer(WriterPid, Counters) -> | |
+ UpdateFun = fun({WriterRef, Count}) -> {WriterRef, Count + 1} end, | |
+ InitialValue = case orddict:find(WriterPid, Counters) of | |
+ {ok, IV} -> | |
+ IV; | |
+ error -> | |
+ WriterRef = erlang:monitor(process, WriterPid), | |
+ {WriterRef, 1} | |
+ end, | |
+ orddict:update(WriterPid, UpdateFun, InitialValue, Counters). | |
+ | |
+ | |
+remove_writer(WriterPid, WriterRef, Counters) -> | |
+ case orddict:find(WriterPid, Counters) of | |
+ {ok, {WriterRef, _}} -> | |
+ case num_mp_writers() of | |
+ N when N > 1 -> | |
+ num_mp_writers(N - 1); | |
+ _ -> | |
+ abort_parsing | |
+ end; | |
+ {ok, _} -> | |
+ % We got a different ref fired for a known worker | |
+ abort_parsing; | |
+ error -> | |
+ % Unknown worker pid? | |
+ abort_parsing | |
+ end. | |
+ | |
+ | |
num_mp_writers(N) -> | |
erlang:put(mp_att_writers, N). | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment