Skip to content

Instantly share code, notes, and snippets.

@eproxus
Last active March 21, 2024 12:06
Show Gist options
  • Save eproxus/c5d41cc6ba047e80b0379a69b4a6f67b to your computer and use it in GitHub Desktop.
Save eproxus/c5d41cc6ba047e80b0379a69b4a6f67b to your computer and use it in GitHub Desktop.
How to Create a Cowboy Stream Handler

How to Create a Cowboy Stream Handler

Add your stream handler to the list of stream handlers in the protocol options:

{ok, Pid} = cowboy:start_clear(ListenerName,
    [{port, Port}],
    #{
        env => #{dispatch => Dispatch},
        stream_handlers => [my_stream_h, cowboy_compress_h, cowboy_stream_h]
    }
),

A stream handler

  • should implement the behavior cowboy_stream.
  • inserts itself into the request/response stack and can modify both incoming requests and outgoing repsonses

Examples

-module(my_stream_h).
-behaviour(cowboy_stream).
% Callbacks
-export([init/3]).
-export([data/4]).
-export([info/3]).
-export([terminate/3]).
-export([early_error/5]).
%--- Callbacks -----------------------------------------------------------------
init(StreamID, Req, Opts) ->
logger:info("[~p] begin~n", [StreamID]),
{Commands0, Next} = cowboy_stream:init(StreamID, Req, Opts),
{Commands0, #{next => Next}}.
data(StreamID, IsFin, Data, #{next := Next0} = State0) ->
logger:info("[~p] data [~p] ~p~n", [StreamID, IsFin, Data]),
{Commands0, Next1} = cowboy_stream:data(StreamID, IsFin, Data, Next0),
{Commands0, State0#{next => Next1}}.
info(StreamID, Info, #{next := Next0} = State0) ->
logger:info("[~p] info ~p~n", [StreamID, Info]),
{Commands0, Next1} = cowboy_stream:info(StreamID, Info, Next0),
{Commands0, State0#{next => Next1}}.
terminate(StreamID, Reason, #{next := Next0}) ->
logger:info("[~p] terminate ~p~n", [StreamID, Reason]),
cowboy_stream:terminate(StreamID, Reason, Next0).
early_error(StreamID, Reason, PartialReq, Resp, Opts) ->
logger:error("[~p] early error ~p ~p ~p ~p~n", [StreamID, Reason, PartialReq, Resp, Opts]),
cowboy_stream:early_error(StreamID, Reason, PartialReq, Resp, Opts).
@llucio
Copy link

llucio commented Sep 9, 2022

Was trying to convert this to elixir and failed miserably, can you take a look at this:

defmodule CompanySearchWeb.CowboyLoggingHandler do
  require Logger
  @behaviour :cowboy_stream

  @impl :cowboy_stream
  def init(StreamID, Req, Opts) do
    :cowboy_stream.init(StreamID, Req, Opts)
  end

  @impl :cowboy_stream
  def info(StreamID, Info, State) do
    :cowboy_stream.info(StreamID, Info, State)
  end

  @impl :cowboy_stream
  def data(StreamID, IsFin, Data, State) do
    :cowboy_stream.data(StreamID, IsFin, Data, State)
  end

  @impl :cowboy_stream
  def terminate(StreamID, Reason, State) do
	case Reason !== :normal do
	  true -> Logger.error(%{"StreamID" => StreamID, "Reason" => Reason, "State" => State})
	  false -> :ok
	end
	:cowboy_stream.terminate(StreamID, Reason, State)
  end

  @impl :cowboy_stream
  def early_error(StreamID, Reason, PartialReq, Resp, Opts) do
    Logger.error("[~p] early error ~p ~p ~p ~p~n", [StreamID, Reason, PartialReq, Resp, Opts])
    :cowboy_stream.early_error(StreamID, Reason, PartialReq, Resp, Opts)
  end
end

@eproxus
Copy link
Author

eproxus commented Sep 12, 2022

@llucio Failed in what way?

@hmmr
Copy link

hmmr commented Mar 6, 2024

@eproxus Can you elaborate on what your Dispatch would be? I have cobbled together a stream handler for my needs, which works at least for POSTing a file, but invariably returns 400 -- I suspect because my Dispatch is empty. If I add a rule like {"/[...]", my_stream_h, #{}}, I get an error:undef (because my stream handler, obviously, has an init different from init cowboy wants to call). And if I replace my_stream_h with some non-existing module, streaming does happen but I get a 404.

In other words, how do I mention my_handler_h in the routing dispatch table?

@hmmr
Copy link

hmmr commented Mar 7, 2024

Answering to myself, the routes should have a separate, non-streaming handler module, like so:

    Dispatch =
        cowboy_router:compile(
          [{'_', [{"/[...]", nonstreaming_h, []}]}]
         ),

with just init/2 in it (note that cowboy_loop):

-module(nonstreaming_h).

-export([init/2]).

init(Req0 = #{method := <<"GET">>}, Opts) ->
    Req = cowboy_req:stream_reply(200, Req0),
    {cowboy_loop, Req, Opts};

init(Req0 = #{method := <<"POST">>}, Opts) ->
    Req = cowboy_req:reply(200, Req0),
    {cowboy_loop, Req, Opts}.

That seems to be sufficient to set up the state for streaming, which is to be done via cowboy_req:cast/2 and data/4 in your streaming_h.erl.

@eproxus
Copy link
Author

eproxus commented Mar 21, 2024

@hmmr Hi! The terms used by Cowboy to describe these things are very confusing indeed. Almost everything is called a handler (even middleware are called handlers in the source code!).

Here are the differences as I've understood them:

  • A stream handler runs in a stream process (per request) and can be chained together with other stream handlers. They see both the incoming data and any outgoing response, together with any message/event sent to the stream handler process.
    • From the stream handler a request process is started (so Cowboy actually runs two processes per request). In that process, two types of customizable behavior is executed:
      • The middlewares are run in sequence (by default, the built-in Cowboy router middleware and the default handler middleware). These start your own request handlers based on the routing dispatch table
      • Your own request handler. Any response here (or earlier, e.g. in the router middleware that can return a 404 in case no route exists) cuts short the stack of middleware and sends the response back up to the stream handlers.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment