diff --git a/src/couch/src/couch_httpd_multipart.erl b/src/couch/src/couch_httpd_multipart.erl | |
index 6ce3c76fe..3bf040220 100644 | |
--- a/src/couch/src/couch_httpd_multipart.erl | |
+++ b/src/couch/src/couch_httpd_multipart.erl | |
@@ -23,16 +23,21 @@ | |
-include_lib("couch/include/couch_db.hrl"). | |
+-define(DEFAULT_PARSING_TIMEOUT, 300000). % 5 minutes | |
+ | |
decode_multipart_stream(ContentType, DataFun, Ref) -> | |
Parent = self(), | |
NumMpWriters = num_mp_writers(), | |
+ ParsingTimeout = config:get("couchdb", | |
+ "multipart_multipart_parsing_timeout", ?DEFAULT_PARSING_TIMEOUT), | |
+ Options = [{parsing_timeout, ParsingTimeout}], | |
{Parser, ParserRef} = spawn_monitor(fun() -> | |
ParentRef = erlang:monitor(process, Parent), | |
put(mp_parent_ref, ParentRef), | |
num_mp_writers(NumMpWriters), | |
{<<"--",_/binary>>, _, _} = couch_httpd:parse_multipart_request( | |
ContentType, DataFun, | |
- fun(Next) -> mp_parse_doc(Next, []) end), | |
+ fun(Next) -> mp_parse_doc(Next, [], Options) end), | |
unlink(Parent) | |
end), | |
Parser ! {get_doc_bytes, Ref, self()}, | |
@@ -55,56 +60,64 @@ decode_multipart_stream(ContentType, DataFun, Ref) -> | |
end. | |
-mp_parse_doc({headers, H}, []) -> | |
+mp_parse_doc({headers, H}, [], Options) -> | |
case couch_util:get_value("content-type", H) of | |
{"application/json", _} -> | |
fun (Next) -> | |
- mp_parse_doc(Next, []) | |
+ mp_parse_doc(Next, [], Options) | |
end; | |
_ -> | |
throw({bad_ctype, <<"Content-Type must be application/json">>}) | |
end; | |
-mp_parse_doc({body, Bytes}, AccBytes) -> | |
+mp_parse_doc({body, Bytes}, AccBytes, Options) -> | |
fun (Next) -> | |
- mp_parse_doc(Next, [Bytes | AccBytes]) | |
+ mp_parse_doc(Next, [Bytes | AccBytes], Options) | |
end; | |
-mp_parse_doc(body_end, AccBytes) -> | |
+mp_parse_doc(body_end, AccBytes, Options) -> | |
+ ParsingTimeout = proplists:get_value(parsing_timeout, Options), | |
receive {get_doc_bytes, Ref, From} -> | |
- From ! {doc_bytes, Ref, lists:reverse(AccBytes)} | |
- end, | |
- fun(Next) -> | |
- mp_parse_atts(Next, {Ref, [], 0, orddict:new(), []}) | |
+ From ! {doc_bytes, Ref, lists:reverse(AccBytes)}, | |
+ fun(Next) -> | |
+ mp_parse_atts(Next, {Ref, [], 0, orddict:new(), []}, Options) | |
+ end | |
+ after ParsingTimeout -> | |
+ couch_log:error("Multipart parsing timeout of ~p reached in mp_parse_doc, exiting.", | |
+ [ParsingTimeout]), | |
+ exit({normal, timeout}) | |
end. | |
-mp_parse_atts({headers, _}, Acc) -> | |
- fun(Next) -> mp_parse_atts(Next, Acc) end; | |
-mp_parse_atts(body_end, Acc) -> | |
- fun(Next) -> mp_parse_atts(Next, Acc) end; | |
-mp_parse_atts({body, Bytes}, {Ref, Chunks, Offset, Counters, Waiting}) -> | |
- case maybe_send_data({Ref, Chunks++[Bytes], Offset, Counters, Waiting}) of | |
+mp_parse_atts({headers, _}, Acc, Options) -> | |
+ fun(Next) -> mp_parse_atts(Next, Acc, Options) end; | |
+mp_parse_atts(body_end, Acc, Options) -> | |
+ fun(Next) -> mp_parse_atts(Next, Acc, Options) end; | |
+mp_parse_atts({body, Bytes}, {Ref, Chunks, Offset, Counters, Waiting}, Options) -> | |
+ case maybe_send_data({Ref, Chunks++[Bytes], Offset, Counters, Waiting}, Options) of | |
abort_parsing -> | |
fun(Next) -> mp_abort_parse_atts(Next, nil) end; | |
NewAcc -> | |
- fun(Next) -> mp_parse_atts(Next, NewAcc) end | |
+ fun(Next) -> mp_parse_atts(Next, NewAcc, Options) end | |
end; | |
-mp_parse_atts(eof, {Ref, Chunks, Offset, Counters, Waiting}) -> | |
+mp_parse_atts(eof, {Ref, Chunks, Offset, Counters, Waiting}, Options) -> | |
N = num_mp_writers(), | |
M = length(Counters), | |
case (M == N) andalso Chunks == [] of | |
true -> | |
ok; | |
false -> | |
+ ParsingTimeout = proplists:get_value(parsing_timeout, Options), | |
ParentRef = get(mp_parent_ref), | |
receive | |
abort_parsing -> | |
ok; | |
{get_bytes, Ref, From} -> | |
C2 = orddict:update_counter(From, 1, Counters), | |
- NewAcc = maybe_send_data({Ref, Chunks, Offset, C2, [From|Waiting]}), | |
- mp_parse_atts(eof, NewAcc); | |
+ NewAcc = maybe_send_data({Ref, Chunks, Offset, C2, [From|Waiting]}, Options), | |
+ mp_parse_atts(eof, NewAcc, Options); | |
{'DOWN', ParentRef, _, _, _} -> | |
exit(mp_reader_coordinator_died) | |
- after 3600000 -> | |
+ after ParsingTimeout -> | |
+ couch_log:info("Multipart parsing timeout of ~p reached in mp_parse_atts, ignoring.", | |
+ [ParsingTimeout]), | |
ok | |
end | |
end. | |
@@ -114,10 +127,10 @@ mp_abort_parse_atts(eof, _) -> | |
mp_abort_parse_atts(_, _) -> | |
fun(Next) -> mp_abort_parse_atts(Next, nil) end. | |
-maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}) -> | |
+maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}, Options) -> | |
receive {get_bytes, Ref, From} -> | |
NewCounters = orddict:update_counter(From, 1, Counters), | |
- maybe_send_data({Ref, Chunks, Offset, NewCounters, [From|Waiting]}) | |
+ maybe_send_data({Ref, Chunks, Offset, NewCounters, [From|Waiting]}, Options) | |
after 0 -> | |
% reply to as many writers as possible | |
NewWaiting = lists:filter(fun(Writer) -> | |
@@ -154,6 +167,7 @@ maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}) -> | |
% someone has written all possible chunks, keep moving | |
{Ref, NewChunks, NewOffset, Counters, NewWaiting}; | |
true -> | |
+ ParsingTimeout = proplists:get_value(parsing_timeout, Options), | |
ParentRef = get(mp_parent_ref), | |
receive | |
abort_parsing -> | |
@@ -162,7 +176,11 @@ maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}) -> | |
exit(mp_reader_coordinator_died); | |
{get_bytes, Ref, X} -> | |
C2 = orddict:update_counter(X, 1, Counters), | |
- maybe_send_data({Ref, NewChunks, NewOffset, C2, [X|NewWaiting]}) | |
+ maybe_send_data({Ref, NewChunks, NewOffset, C2, [X|NewWaiting]}, Options) | |
+ after ParsingTimeout -> | |
+ couch_log:error("Multipart parsing timeout of ~p reached in maybe_send_data, exiting.", | |
+ [ParsingTimeout]), | |
+ exit({normal, timeout}) | |
end | |
end | |
end. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment