Skip to content

Instantly share code, notes, and snippets.

@weaversam8
Created November 3, 2023 17:47
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save weaversam8/c77fe1df5749d6eaa459d9e1b6f61d9d to your computer and use it in GitHub Desktop.
Save weaversam8/c77fe1df5749d6eaa459d9e1b6f61d9d to your computer and use it in GitHub Desktop.
Cowboy Stream Handler Reference Implementation

Cowboy, the popular Erlang web server, has deprecated middleware and replaced the concept with stream handlers, a more flexible, but more complicated API. This page contains a documented reference implementation of a stream handler, to help others when developing their stream handlers in the future.

-module(reference_stream_h).
-behavior(cowboy_stream).
-export([init/3]).
-export([data/4]).
-export([info/3]).
-export([terminate/3]).
-export([early_error/5]).
-record(state, {next}).
%%
%% Callback Functions
%%
% This implementation was modeled from imetrics_cowboy_stream_h.
% This is a ready-to-use template that other stream handlers can be
% based on.
% This function is called when the request is first received and the
% headers are parsed and known.
-spec init(cowboy_stream:streamid(), cowboy_req:req(), cowboy:opts())
-> {cowboy_stream:commands(), #state{}}.
init(StreamID, Req = #{ path := Path }, Opts) ->
% The state record here holds our "Next" tuple, which contains a reference
% to ourselves (module) and a state maintained by the cowboy_stream module.
% It can be passed around opaquely.
State0 = #state{},
% When we're ready to invoke the next handler in the chain, we call
% cowboy_stream:init/3. It returns that "Next" tuple and a list of "commands"
% to send to the upstream HTTP process. Those commands are documented [here][0]
%
% [0]: https://ninenines.eu/docs/en/cowboy/2.8/manual/cowboy_stream/#commands
{Commands0, Next} = cowboy_stream:init(StreamID, Req, Opts),
% We pass these commands thru our "fold" function, which allows us to iterate
% thru each command and modify it if necessary. We can also add new commands
% here or in that function if we want to. Ultimately, the list of commands is
% returned by this function, and passed to the next handler upstream (or cowboy,
% which passes them to the client, if this is the last handler.)
fold(Commands0, State0#state{next=Next}).
% This function is called with the data sent in the request body. It can be called in
% chunks, so make sure to check the "IsFin" flag to decide whether you've received
% all the data.
-spec data(cowboy_stream:streamid(), cowboy_stream:fin(), cowboy_req:resp_body(), State)
-> {cowboy_stream:commands(), State} when State::#state{}.
data(StreamID, IsFin, Data, State0=#state{next=Next0}) ->
% Invoke the next stream handler in the chain.
{Commands0, Next} = cowboy_stream:data(StreamID, IsFin, Data, Next0),
% Process any commands returned by downstream stream handlers.
fold(Commands0, State0#state{next=Next}).
% Any messages addressed to the stream will be sent to this function. Cowboy also uses
% this function to inform stream handlers of internal events.
-spec info(cowboy_stream:streamid(), any(), State)
-> {cowboy_stream:commands(), State} when State::#state{}.
info(StreamID, Info, State0=#state{next=Next0}) ->
% Invoke the next stream handler in the chain.
{Commands0, Next} = cowboy_stream:info(StreamID, Info, Next0),
% Process any commands returned by downstream stream handlers.
fold(Commands0, State0#state{next=Next}).
% All streams will eventually be terminated and this function will be called. The
% only time when terminate/3 is not called is when an error occurs in init/3 of
% any stream handler, since the state is not available.
-spec terminate(cowboy_stream:streamid(), cowboy_stream:reason(), #state{}) -> any().
terminate(StreamID, Reason, #state{next=Next}) ->
% Propagate the terminate to downstream stream handlers.
cowboy_stream:terminate(StreamID, Reason, Next).
% This function will be called when an error occurs before the request-line and
% all headers have been received in an HTTP/1.1 request. It includes the partial
% request, and the response Cowboy intends to send. The return value of this
% function is the response, so it allows you to modify it if necessary.
-spec early_error(cowboy_stream:streamid(), cowboy_stream:reason(),
cowboy_stream:partial_req(), Resp, cowboy:opts()) -> Resp
when Resp::cowboy_stream:resp_command().
early_error(StreamID, Reason, PartialReq, Resp, Opts) ->
% Propagate the error to downstream stream handlers.
cowboy_stream:early_error(StreamID, Reason, PartialReq, Resp, Opts).
%%
%% Internal Functions
%%
% Any commands (see [0]) returned from downstream handlers are passed thru this
% fold function, giving us the chance to modify, omit, replace, or append to them.
%
% This module transparently passes the commands thru without any changes, but this
% logic remains in place since this is the best documented stream handler in all of
% our codebases, and it should exist as a reference for other stream handlers to hook
% into in the future.
%
% [0]: https://ninenines.eu/docs/en/cowboy/2.8/manual/cowboy_stream/#commands
fold(Commands, State) ->
fold(Commands, State, []).
% At the end of the recursion, reverse the accumulator so the original order of
% commands is preserved.
fold([], State, Acc) ->
{lists:reverse(Acc), State};
% You can add additional cases here to catch specific commands.
% fold([{response, ResponseCode, Headers, Body0}|Tail], State0, Acc) ->
% % do something with the body
% Body = do_something(Body0),
% Response = {response, ResponseCode, Headers, Body},
% fold(Tail, State0, [Response|Acc]);
% This is the catch-all case for any commands we didn't match above, pass it
% thru unmodified.
fold([Command|Tail], State, Acc) ->
fold(Tail, State, [Command|Acc]).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment