Skip to content

Instantly share code, notes, and snippets.

@janl
Last active February 8, 2018 17:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save janl/acb1103de4d29506c5c27c1c0017266a to your computer and use it in GitHub Desktop.
Save janl/acb1103de4d29506c5c27c1c0017266a to your computer and use it in GitHub Desktop.
diff --git a/src/couch/src/couch_httpd_multipart.erl b/src/couch/src/couch_httpd_multipart.erl
index 6ce3c76fe..3bf040220 100644
--- a/src/couch/src/couch_httpd_multipart.erl
+++ b/src/couch/src/couch_httpd_multipart.erl
@@ -23,16 +23,21 @@
-include_lib("couch/include/couch_db.hrl").
+-define(DEFAULT_PARSING_TIMEOUT, 300000). % 5 minutes
+
decode_multipart_stream(ContentType, DataFun, Ref) ->
Parent = self(),
NumMpWriters = num_mp_writers(),
+ ParsingTimeout = config:get("couchdb",
+ "multipart_multipart_parsing_timeout", ?DEFAULT_PARSING_TIMEOUT),
+ Options = [{parsing_timeout, ParsingTimeout}],
{Parser, ParserRef} = spawn_monitor(fun() ->
ParentRef = erlang:monitor(process, Parent),
put(mp_parent_ref, ParentRef),
num_mp_writers(NumMpWriters),
{<<"--",_/binary>>, _, _} = couch_httpd:parse_multipart_request(
ContentType, DataFun,
- fun(Next) -> mp_parse_doc(Next, []) end),
+ fun(Next) -> mp_parse_doc(Next, [], Options) end),
unlink(Parent)
end),
Parser ! {get_doc_bytes, Ref, self()},
@@ -55,56 +60,64 @@ decode_multipart_stream(ContentType, DataFun, Ref) ->
end.
-mp_parse_doc({headers, H}, []) ->
+mp_parse_doc({headers, H}, [], Options) ->
case couch_util:get_value("content-type", H) of
{"application/json", _} ->
fun (Next) ->
- mp_parse_doc(Next, [])
+ mp_parse_doc(Next, [], Options)
end;
_ ->
throw({bad_ctype, <<"Content-Type must be application/json">>})
end;
-mp_parse_doc({body, Bytes}, AccBytes) ->
+mp_parse_doc({body, Bytes}, AccBytes, Options) ->
fun (Next) ->
- mp_parse_doc(Next, [Bytes | AccBytes])
+ mp_parse_doc(Next, [Bytes | AccBytes], Options)
end;
-mp_parse_doc(body_end, AccBytes) ->
+mp_parse_doc(body_end, AccBytes, Options) ->
+ ParsingTimeout = proplists:get_value(parsing_timeout, Options),
receive {get_doc_bytes, Ref, From} ->
- From ! {doc_bytes, Ref, lists:reverse(AccBytes)}
- end,
- fun(Next) ->
- mp_parse_atts(Next, {Ref, [], 0, orddict:new(), []})
+ From ! {doc_bytes, Ref, lists:reverse(AccBytes)},
+ fun(Next) ->
+ mp_parse_atts(Next, {Ref, [], 0, orddict:new(), []}, Options)
+ end
+ after ParsingTimeout ->
+ couch_log:error("Multipart parsing timeout of ~p reached in mp_parse_doc, exiting.",
+ [ParsingTimeout]),
+ exit({normal, timeout})
end.
-mp_parse_atts({headers, _}, Acc) ->
- fun(Next) -> mp_parse_atts(Next, Acc) end;
-mp_parse_atts(body_end, Acc) ->
- fun(Next) -> mp_parse_atts(Next, Acc) end;
-mp_parse_atts({body, Bytes}, {Ref, Chunks, Offset, Counters, Waiting}) ->
- case maybe_send_data({Ref, Chunks++[Bytes], Offset, Counters, Waiting}) of
+mp_parse_atts({headers, _}, Acc, Options) ->
+ fun(Next) -> mp_parse_atts(Next, Acc, Options) end;
+mp_parse_atts(body_end, Acc, Options) ->
+ fun(Next) -> mp_parse_atts(Next, Acc, Options) end;
+mp_parse_atts({body, Bytes}, {Ref, Chunks, Offset, Counters, Waiting}, Options) ->
+ case maybe_send_data({Ref, Chunks++[Bytes], Offset, Counters, Waiting}, Options) of
abort_parsing ->
fun(Next) -> mp_abort_parse_atts(Next, nil) end;
NewAcc ->
- fun(Next) -> mp_parse_atts(Next, NewAcc) end
+ fun(Next) -> mp_parse_atts(Next, NewAcc, Options) end
end;
-mp_parse_atts(eof, {Ref, Chunks, Offset, Counters, Waiting}) ->
+mp_parse_atts(eof, {Ref, Chunks, Offset, Counters, Waiting}, Options) ->
N = num_mp_writers(),
M = length(Counters),
case (M == N) andalso Chunks == [] of
true ->
ok;
false ->
+ ParsingTimeout = proplists:get_value(parsing_timeout, Options),
ParentRef = get(mp_parent_ref),
receive
abort_parsing ->
ok;
{get_bytes, Ref, From} ->
C2 = orddict:update_counter(From, 1, Counters),
- NewAcc = maybe_send_data({Ref, Chunks, Offset, C2, [From|Waiting]}),
- mp_parse_atts(eof, NewAcc);
+ NewAcc = maybe_send_data({Ref, Chunks, Offset, C2, [From|Waiting]}, Options),
+ mp_parse_atts(eof, NewAcc, Options);
{'DOWN', ParentRef, _, _, _} ->
exit(mp_reader_coordinator_died)
- after 3600000 ->
+ after ParsingTimeout ->
+ couch_log:info("Multipart parsing timeout of ~p reached in mp_parse_atts, ignoring.",
+ [ParsingTimeout]),
ok
end
end.
@@ -114,10 +127,10 @@ mp_abort_parse_atts(eof, _) ->
mp_abort_parse_atts(_, _) ->
fun(Next) -> mp_abort_parse_atts(Next, nil) end.
-maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}) ->
+maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}, Options) ->
receive {get_bytes, Ref, From} ->
NewCounters = orddict:update_counter(From, 1, Counters),
- maybe_send_data({Ref, Chunks, Offset, NewCounters, [From|Waiting]})
+ maybe_send_data({Ref, Chunks, Offset, NewCounters, [From|Waiting]}, Options)
after 0 ->
% reply to as many writers as possible
NewWaiting = lists:filter(fun(Writer) ->
@@ -154,6 +167,7 @@ maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}) ->
% someone has written all possible chunks, keep moving
{Ref, NewChunks, NewOffset, Counters, NewWaiting};
true ->
+ ParsingTimeout = proplists:get_value(parsing_timeout, Options),
ParentRef = get(mp_parent_ref),
receive
abort_parsing ->
@@ -162,7 +176,11 @@ maybe_send_data({Ref, Chunks, Offset, Counters, Waiting}) ->
exit(mp_reader_coordinator_died);
{get_bytes, Ref, X} ->
C2 = orddict:update_counter(X, 1, Counters),
- maybe_send_data({Ref, NewChunks, NewOffset, C2, [X|NewWaiting]})
+ maybe_send_data({Ref, NewChunks, NewOffset, C2, [X|NewWaiting]}, Options)
+ after ParsingTimeout ->
+ couch_log:error("Multipart parsing timeout of ~p reached in maybe_send_data, exiting.",
+ [ParsingTimeout]),
+ exit({normal, timeout})
end
end
end.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment