Skip to content

Instantly share code, notes, and snippets.

@t2ym
Last active April 30, 2022 04:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save t2ym/bf9acf06d5ae36338f8330fafd11dadb to your computer and use it in GitHub Desktop.
Save t2ym/bf9acf06d5ae36338f8330fafd11dadb to your computer and use it in GitHub Desktop.
nicexprs.h: a single header file ExpressJs-like middleware API on nghttp2 asio library
/*
* Copyright (c) 2021, Tetsuya Mori <t2y3141592@gmail.com>
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR
* ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
/*
nicexprs.h: a single header file ExpressJs-like middleware API on nghttp2 asio library
# Dependencies
- nghttp2 asio library and its dependencies (boost, openssl, etc.)
- --enable-asio-lib must be enabled in nghttp2 configuration
# Features
- HTTP2 server middleware framework on top of the nghttp2 asio library
- No changes to the base nghttp2 asio library
- Similar to ExpressJs API
- Compatible with C++14 and later
# TODOs
- TBD
*/
/*
# Change Log
To be converted to CHANGELOG.md in a dedicated project
## [Unreleased]
### Added
### Changed
### Removed
### Fixed
## [0.0.10] - 2021-05-18
### Added
- Support streaming filters if STREAMING_SUPPORT macro is defined as truthy
- response::on_response(response_cb on_header, stream_cb cb)
- register a streaming filter callback with its corresponding on-header callback
- If STREAMING_SUPPORT macro is undefined or 0, nicexprs.h is almost the same as the previous version 0.0.9
- Define std::iostream adaptor class generator_stream if GENERATOR_STREAM macro is defined as truthy
- Define boost_filtering_streambuf_adaptor class if BOOST_FILTER macro is defined as truthy
### Changed
- Change type of response::response_filters as std::list<filter_item> from std::list<response_cb>
- filter_item contains response_cb for buffered filtering,
or stream_cb for streamed filtering and another response_cb for header filtering in streaming
### Removed
- Remove unimplemented web_app::static_file()
## [0.0.9] - 2021-05-17
### Fixed
- Fix the segmentation fault issue on popping an item from empty std::list<quadple>
## [0.0.8] - 2021-05-10
### Fixed
- Fix the regression issue of empty deferred response
## [0.0.7] - 2021-05-04
### Added
- Bypass response buffering if response filters are empty
## [0.0.6] - 2021-05-02
### Added
- Support multiple on_close callbacks std::list<close_cb> response::on_close_callbacks invoked in the reversed order of their registrations
### Removed
- response::is_on_close_set
- response::on_close_
## [0.0.5] - 2021-04-28
### Added
- Add web_app.get(std::string path) and web_app.post(std::string path)
## [0.0.4] - 2021-04-27
### Added
- Add extensible req.helper to store and manipulate data per request
## [0.0.3] - 2021-04-25
### Changed
- Work around SIGSEGV when DUMP_MIDDLEWARE_ITEM macro is not defined
## [0.0.2] - 2021-04-25
### Added
- DUMP_MIDDLEWARE_ITEM macro to switch on/off dumping middleware_items
## [0.0.1] - 2021-04-25
### Added
- Initial PoC version as a single header file
- Subject to drastic changes
*/
#ifndef NICEXPRS_H
#define NICEXPRS_H
#include <iostream>
#include <string>
#include <memory>
#include <boost/stacktrace.hpp>
#include <boost/algorithm/string.hpp>
#include <nlohmann/json.hpp>
#include <openssl/sha.h>
#include <boost/archive/iterators/base64_from_binary.hpp>
#include <boost/archive/iterators/transform_width.hpp>
#if STREAMING_SUPPORT
#if BOOST_FILTER
#include <boost/iostreams/filtering_streambuf.hpp>
#endif
#endif
#include <nghttp2/asio_http2_server.h>
namespace nicexprs {
using json = nlohmann::json;
namespace h2a = nghttp2::asio_http2;
namespace h2s = nghttp2::asio_http2::server;
class web_app;
struct middleware_item;
#if STREAMING_SUPPORT
struct filter_item;
struct deffered_status;
#endif
class middleware_controller;
class helper_base;
class request;
class response;
typedef h2s::http2 http2;
typedef h2s::request_cb request_cb;
typedef h2a::header_map header_map;
typedef h2a::header_value header_value;
typedef h2a::uri_ref uri_ref;
typedef h2a::generator_cb generator_cb;
typedef h2a::close_cb close_cb;
typedef h2a::data_cb data_cb;
typedef std::size_t middleware_index;
typedef std::shared_ptr<web_app> web_app_ptr;
typedef std::function<void (request &req, response &res)> middleware_cb;
typedef std::function<void (response &res)> response_cb;
#if STREAMING_SUPPORT
typedef std::function<generator_cb (response &res, generator_cb src)> stream_cb;
typedef std::list<filter_item> filter_chain;
#endif
typedef std::function<void (response &res, std::string &method, std::string &raw_path_query, header_map &header)> push_cb; // synchronous callback
typedef std::list<middleware_item> middleware_chain;
typedef std::vector<middleware_item> middleware_vector;
typedef enum class next_type {
NEXT_APP,
FIRST_CHILD,
MATCHING_CHILD
} next_type;
typedef enum class route_type {
REQUEST, // /path/file.ext; from HTTP client
ALL, // ; route * for middleware_item
PREFIX, // /path/subpath; route /path/subpath for middleware_item without parameters
PARAMETERIZED_PREFIX, // /path; route /path/:param1/:param2 for middleware_item with parameters
REGEX // /path/v([^/]*); route /path/v([^/]*) for middleware_item with regex patterns
} route_type;
typedef enum class app_type {
APP,
DISPATCHER,
RAW_MIDDLEWARE
} app_type;
struct request_key {
std::string method_;
uri_ref uri_;
std::size_t prefix_length;
route_type type;
};
struct middleware_item {
middleware_index index;
std::string prefix;
std::string method;
std::string path;
middleware_cb cb;
middleware_index next;
app_type type;
app_type parent_type;
middleware_index parent_index;
middleware_index original_next;
middleware_index parent_next;
std::list<middleware_index> children;
std::map<std::string, std::map<request_key, middleware_index>> dispatch_map;
route_type r_type;
std::size_t prefix_length;
};
#if STREAMING_SUPPORT
struct filter_item {
filter_item(middleware_index index, response_cb buffered): index(index), buffered(buffered) {}
filter_item(middleware_index index, stream_cb streamed, response_cb on_header): index(index), streamed(streamed), on_header(on_header) {}
middleware_index index;
response_cb buffered;
stream_cb streamed;
response_cb on_header;
};
struct deferred_status {
typedef enum {
ORIGIN = std::string::npos
} deferred_index;
deferred_status(middleware_index index, bool deferred = false, bool resumed = false)
: index(index), deferred(deferred), resumed(resumed) {}
middleware_index index;
bool deferred;
bool resumed;
};
#endif
class helper_base {
public:
virtual ~helper_base() {}
};
inline bool operator<(const request_key &a, const request_key &b);
class request {
public:
request(const h2s::request &req)
: req(req),
key(request_key{req.method(), req.uri(), 0, route_type::REQUEST}),
is_matched(false),
header_(req.header()),
body() {
#if INSTANTIATION_LOG
std::cerr << "Constructed request " << key.uri_.raw_path << std::endl;
#endif
}
~request() {
#if INSTANTIATION_LOG
std::cerr << "Destructed request " << key.uri_.raw_path << std::endl;
#endif
}
header_map &header() {
return header_; // non-const header_map &
}
const std::string &method() const {
return key.method_;
}
uri_ref &uri() {
return key.uri_; // non-const uri_ref &
}
void on_data(data_cb cb) {
req.on_data(std::move(cb));
}
void next(uint32_t err = 0, next_type type = next_type::NEXT_APP);
bool match(); // side effects: update req.key.prefix_length and req.is_matched on matching
request_key key;
bool is_matched;
header_map header_;
std::string body; // raw request body
std::shared_ptr<helper_base> helper;
const h2s::request &req;
std::shared_ptr<middleware_controller> controller;
};
class response {
public:
response(const h2s::response &res)
: res(res),
is_header_written(false),
status_code_(0),
is_trailer_expected(false),
#if STREAMING_SUPPORT
force_buffered_filtering(false),
#endif
is_push(false) {
#if INSTANTIATION_LOG
std::cerr << "Constructed response" << std::endl;
#endif
setup_on_close();
}
response(response &initiator, const h2s::response &res, std::string method, std::string raw_path_query, header_map header)
: res(res),
is_header_written(false),
status_code_(0),
is_reading_body(false),
response_filters(initiator.response_filters),
is_trailer_expected(false),
#if STREAMING_SUPPORT
force_buffered_filtering(initiator.force_buffered_filtering),
#endif
is_push(true),
push_method(method),
push_raw_path_query(raw_path_query),
push_header(header) {
#if INSTANTIATION_LOG
std::cerr << "Constructed response for push " << push_raw_path_query << std::endl;
#endif
#if STREAMING_LOG
std::cerr << "[response::response() for push]: response_filters.size() = " << response_filters.size() << std::endl;
#endif
setup_on_close();
}
~response() {
#if INSTANTIATION_LOG
if (is_push) {
std::cerr << "Destructed response for push " << push_raw_path_query << std::endl;
}
else {
std::cerr << "Destructed response" << std::endl;
}
#endif
}
// from nghttp2/src/http2.cc
bool expect_response_body(int status_code) {
return status_code == 101 ||
(status_code / 100 != 1 && status_code != 304 && status_code != 204);
}
// from nghttp2/src/http2.cc
bool expect_response_body(const std::string &method, int status_code) {
return method != "HEAD" && expect_response_body(status_code);
}
void write_head(unsigned int status_code, header_map h = header_map{});
unsigned int status_code() {
return status_code_;
}
header_map &header() {
return header_;
}
header_map &trailer() {
return trailer_;
}
std::shared_ptr<response> push(boost::system::error_code &ec,
std::string method,
std::string raw_path_query,
header_map header = header_map{}) {
auto push_it = push_filters.cbegin();
auto push_end = push_filters.cend();
while (push_it != push_end) {
(*push_it)(*this, method, raw_path_query, header);
push_it++;
}
const h2s::response *raw_push_res = res.push(ec, method, raw_path_query, header);
if (ec) {
return nullptr;
}
auto push_res = std::make_shared<response>(*this, *raw_push_res, method, raw_path_query, header); // TODO: is is inefficient to copy every element of response_filters
push_res->controller = controller;
push_responses.emplace(raw_path_query, push_res);
return push_res;
}
void end(std::string data = "");
void next(generator_cb::result_type err = 0) {
if (err < 0) {
res.end([err](uint8_t *buf, std::size_t len, uint32_t *data_flags)->ssize_t {
return err;
});
}
else {
it++;
if (it != response_filters.cend()) {
#if STREAMING_SUPPORT
if (!it->buffered) {
(wrap_stream_cb(it->streamed, it->on_header))(*this);
}
else {
(it->buffered)(*this);
}
#else // !STREAMING_SUPPORT
(*it)(*this);
#endif // !STREAMING_SUPPORT
}
else {
respond_body(*this);
}
}
}
response_cb buffer_body(generator_cb cb, std::size_t buflen = 16384) { // streaming is not supported
auto ss = std::make_shared<std::ostringstream>();
auto isEOF = std::make_shared<bool>(false);
return [cb, buflen, ss, isEOF](response &res) {
#if STREAMING_LOG
std::cerr << "[response::buffer_body callback] called; isEOF = " << *isEOF << " ss.str().size() = " << ss->str().size() << std::endl;
#endif
uint32_t data_flags;
ssize_t upstream_bytes = 0;
if (!*isEOF) {
std::vector<uint8_t> buf(buflen);
upstream_bytes = 0;
while (upstream_bytes >= 0) {
data_flags = NGHTTP2_DATA_FLAG_NONE;
upstream_bytes = cb(buf.data(), buf.size(), &data_flags);
if (upstream_bytes > 0) {
ss->write(reinterpret_cast<const char *>(buf.data()), upstream_bytes);
}
if (upstream_bytes < 0) {
break;
}
else if (data_flags & NGHTTP2_DATA_FLAG_EOF) {
*isEOF = true;
if (data_flags & NGHTTP2_DATA_FLAG_NO_END_STREAM) {
res.is_trailer_expected = true;
}
break;
}
}
}
if (*isEOF) {
res.body = ss->str();
res.is_reading_body = false;
if (res.it != res.response_filters.cend()) {
#if STREAMING_SUPPORT
if (!res.it->buffered) {
(res.wrap_stream_cb(res.it->streamed, res.it->on_header))(res); // next() on wrapped stream_cb
}
else {
(res.it->buffered)(res); // next()
}
#else // !STREAMING_SUPPORT
(*res.it)(res); // next()
#endif // !STREAMING_SUPPORT
}
else {
res.respond_body(res);
}
return;
}
else if (upstream_bytes == NGHTTP2_ERR_DEFERRED) {
return; // wait for asynchronous resume
}
else if (upstream_bytes == NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE) {
res.is_reading_body = false;
res.next(NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE);
return;
}
};
}
response_cb start_response();
void end(generator_cb cb) {
#if STREAMING_LOG
std::cerr << "end(generator_cb): response_filters.size() = " << response_filters.size() << std::endl;
#endif
if (response_filters.size() == 0 && trailer_filters.size() == 0) {
if (!is_header_written) {
is_header_written = true;
res.write_head(status_code(), header());
}
res.end(cb); // bypass empty filters
return;
}
#if STREAMING_SUPPORT
else if (!force_buffered_filtering && response_filters.size() > 0) {
// streaming
#if STREAMING_LOG
std::cerr << "end(generator_cb): streaming" << std::endl;
#endif
auto latently_deferred_generator_adaptor = [this](middleware_index index, generator_cb cb)->generator_cb {
auto latently_deferred = std::make_shared<deferred_status>(index, false);
this->latently_deferred_status.push_back(latently_deferred); // register the latently_deferred object for the cb
generator_cb wrapped_cb = [this, latently_deferred, cb](uint8_t *buf, std::size_t len, uint32_t *data_flags)->ssize_t {
if (latently_deferred->deferred) {
#if STREAMING_LOG
std::cerr << "[latently_deferred_generator_adaptor] index = "
<< (latently_deferred->index == deferred_status::ORIGIN ? "ORIGIN" : std::to_string(latently_deferred->index))
<< " deferred = true; returning DEFER" << std::endl;
#endif
return NGHTTP2_ERR_DEFERRED; // block a call from the sink by not calling its source as it is still latently deferred
}
else {
// not latently deferred
#if STREAMING_LOG
std::cerr << "[latently_deferred_generator_adaptor] index = "
<< (latently_deferred->index == deferred_status::ORIGIN ? "ORIGIN" : std::to_string(latently_deferred->index))
<< " deferred = false" << std::endl;
#endif
auto _ret = cb(buf, len, data_flags); // call the source
if (_ret == NGHTTP2_ERR_DEFERRED) {
// the source is deferred
#if STREAMING_LOG
std::cerr << "[latently_deferred_generator_adaptor] index = "
<< (latently_deferred->index == deferred_status::ORIGIN ? "ORIGIN" : std::to_string(latently_deferred->index))
<< " the source returned DEFER" << std::endl;
#endif
if (latently_deferred->resumed) {
// the source has already been resumed; skip deferring
#if STREAMING_LOG
std::cerr << "[latently_deferred_generator_adaptor] index = "
<< (latently_deferred->index == deferred_status::ORIGIN ? "ORIGIN" : std::to_string(latently_deferred->index))
<< " resumed = true; returing 0" << std::endl;
#endif
latently_deferred->resumed = false; // update the status
_ret = 0; // return 0 to avoid suspending HTTP/2 layer
}
else {
#if STREAMING_LOG
std::cerr << "[latently_deferred_generator_adaptor] index = "
<< (latently_deferred->index == deferred_status::ORIGIN ? "ORIGIN" : std::to_string(latently_deferred->index))
<< " resumed = false; set deferred = true; returing DEFER" << std::endl;
#endif
latently_deferred->deferred = true; // update the status
}
}
return _ret;
}
};
return wrapped_cb;
};
generator_cb stream_generator_cb = latently_deferred_generator_adaptor(deferred_status::ORIGIN, cb);
for (auto item: response_filters) {
#if STREAMING_LOG
std::cerr << "end(generator_cb): stream_generator_cb = latently_deferred_generator_adaptor(item.index = "
<< item.index
<< ", (item.streamed)(*this, stream_generator_cb))" << std::endl;
#endif
stream_generator_cb = latently_deferred_generator_adaptor(item.index, (item.streamed)(*this, stream_generator_cb));
}
if (!is_header_written) {
auto it = response_filters.cbegin();
auto end = response_filters.cend();
while (it != end) {
if (it->on_header) {
(it->on_header)(*this); // synchronous call
}
it++;
}
is_header_written = true;
#if STREAMING_LOG
std::cerr << "end(generator_cb): res.write_head()" << std::endl;
#endif
res.write_head(status_code(), header());
}
#if STREAMING_LOG
std::cerr << "end(generator_cb): res.end(stream_generator_cb)" << std::endl;
#endif
res.end(stream_generator_cb); // start streaming
return;
}
else {
// buffered
#if STREAMING_LOG
std::cerr << "end(generator_cb): buffered" << std::endl;
#endif
read_body = buffer_body(cb);
respond_body = start_response();
it = response_filters.begin();
is_reading_body = true;
read_body(*this);
}
#else // !STREAMING_SUPPORT
read_body = buffer_body(cb);
respond_body = start_response();
it = response_filters.cbegin();
is_reading_body = true;
read_body(*this);
#endif // !STREAMING_SUPPORT
}
void write_trailer(header_map h = header_map{}) {
trailer_ = std::move(h);
#if STREAMING_SUPPORT
if (!force_buffered_filtering && response_filters.size() > 0) {
// streaming
is_trailer_expected = true;
do_write_trailer();
}
#endif
}
void do_write_trailer() {
if (is_trailer_expected) {
auto it_ = trailer_filters.cbegin();
auto cend = trailer_filters.cend();
while (it_ != cend) {
(*it_)(*this); // synchronous
it_++;
}
res.write_trailer(std::move(trailer_));
}
}
#if STREAMING_SUPPORT
// wrap stream_cb as response_cb for compatibility; no longer streaming
response_cb wrap_stream_cb(stream_cb cb, response_cb header_cb) {
auto is_initial = std::make_shared<bool>(true);
auto is_eof = std::make_shared<bool>(false);
auto left_bytes = std::make_shared<std::size_t>(0);
auto src_generator = std::make_shared<generator_cb>();
auto stream_generator = std::make_shared<generator_cb>();
auto buffer = std::make_shared<std::string>();
auto sink = std::make_shared<std::ostringstream>();
const std::size_t buffer_size = 16384; // bytes
return [cb, header_cb, is_initial, is_eof, left_bytes, src_generator, stream_generator, buffer, buffer_size, sink](response &res) {
if (*is_initial) {
#if STREAMING_LOG
std::cerr << "[wrapped stream_cb callback]: initial = true" << std::endl;
#endif
if (header_cb) {
header_cb(res);
}
*is_initial = false;
*left_bytes = res.body.size();
#if STREAMING_LOG
std::cerr << "[wrapped stream_cb callback]: res.body.size() = " << *left_bytes << std::endl;
#endif
buffer->resize(buffer_size);
*src_generator = [&res, is_eof, left_bytes](uint8_t *dst, std::size_t len, uint32_t *data_flags)->ssize_t {
if (*is_eof) {
#if STREAMING_LOG
std::cerr << "[wrapped stream_cb src_generator callback]: is_eof = " << *is_eof << std::endl;
#endif
*data_flags |= NGHTTP2_DATA_FLAG_EOF;
return (ssize_t)0;
}
auto bytes = std::min(len, *left_bytes);
#if STREAMING_LOG
std::cerr << "[wrapped stream_cb src_generator callback]: called len = " << len << " left_bytes = " << *left_bytes << " bytes = " << bytes << std::endl;
#endif
std::copy_n(res.body.c_str() + (res.body.size() - *left_bytes), bytes, dst);
*left_bytes -= bytes;
if (*left_bytes == 0) {
#if STREAMING_LOG
std::cerr << "[wrapped stream_cb src_generator callback]: left_bytes = " << *left_bytes << std::endl;
#endif
*data_flags |= NGHTTP2_DATA_FLAG_EOF;
*is_eof = true;
}
#if STREAMING_LOG
std::cerr << "[wrapped stream_cb src_generator callback]: return bytes = " << bytes << std::endl;
#endif
return (ssize_t)bytes;
};
*stream_generator = cb(res, *src_generator);
}
uint32_t data_flags = NGHTTP2_DATA_FLAG_NONE;
ssize_t bytes;
#if STREAMING_LOG
std::cerr << "[wrapped stream_cb callback]: calling stream_generator" << std::endl;
#endif
while ((bytes = (*stream_generator)((uint8_t*)buffer->data(), buffer_size, &data_flags)) > 0) {
#if STREAMING_LOG
std::cerr << "[wrapped stream_cb callback]: stream_generator bytes = " << bytes << std::endl;
#endif
sink->write(buffer->data(), bytes);
if (data_flags & NGHTTP2_DATA_FLAG_EOF) {
#if STREAMING_LOG
std::cerr << "[wrapped stream_cb callback]: stream_generator EOF" << std::endl;
#endif
break;
}
data_flags = NGHTTP2_DATA_FLAG_NONE; // reset the flag
}
#if STREAMING_LOG
std::cerr << "[wrapped stream_cb callback]: out of stream_generator bytes = " << bytes << std::endl;
#endif
if (bytes == 0 && (data_flags | NGHTTP2_DATA_FLAG_EOF)) {
// fully generated without error
#if STREAMING_LOG
std::cerr << "[wrapped stream_cb callback]: bytes == 0 & data_flags EOF" << std::endl;
#endif
res.body = sink->str(); // update response body
#if STREAMING_LOG
std::cerr << "[wrapped stream_cb callback]: res.body =" << res.body << std::endl;
#endif
}
else if (bytes == NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE) {
// error in filtering
#if STREAMING_LOG
std::cerr << "[wrapped stream_cb callback]: bytes == NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE" << std::endl;
#endif
res.cancel();
}
else if (bytes == NGHTTP2_ERR_DEFERRED) {
#if STREAMING_LOG
std::cerr << "[wrapped stream_cb callback]: bytes == NGHTTP2_ERR_DEFERRED" << std::endl;
#endif
// deferred
// wait to be recalled until stream_generator calls res.resume()
}
res.next();
};
}
#endif // STREAMING_SUPPORT
#if STREAMING_SUPPORT
void on_response(response_cb cb);
#else // !STREAMING_SUPPORT
void on_response(response_cb cb) {
response_filters.push_front(std::move(cb));
}
#endif // !STREAMING_SUPPORT
#if STREAMING_SUPPORT
void on_response(response_cb on_header, stream_cb cb);
#endif
void on_push(push_cb cb) {
push_filters.push_front(std::move(cb));
}
void on_trailer(response_cb cb) {
trailer_filters.push_front(std::move(cb));
}
void on_close(close_cb cb) {
on_close_callbacks.push_front(std::move(cb));
}
void setup_on_close();
void cancel(uint32_t error_code = NGHTTP2_INTERNAL_ERROR) {
res.cancel(error_code);
}
void resume() {
if (is_header_written) {
#if STREAMING_SUPPORT
#if STREAMING_LOG
std::cerr << "[response::resume()] is_header_written = true" << std::endl;
#endif
if (latently_deferred_status.size() > 0) {
#if STREAMING_LOG
std::cerr << "[response::resume()] latently_deferred_status.size() = " << latently_deferred_status.size() << std::endl;
#endif
if (latently_deferred_status.front()->deferred) {
// lowest generator is deferred; HTTP/2 layer is ready to resume
#if STREAMING_LOG
std::cerr << "[response::resume()] latently_deferred_status.front()->index = "
<< (latently_deferred_status.front()->index == deferred_status::ORIGIN ? "ORIGIN" : std::to_string(latently_deferred_status.front()->index))
<< " deferred = true" << std::endl;
#endif
auto it = latently_deferred_status.begin();
auto end = latently_deferred_status.end();
while (it != end) {
#if STREAMING_LOG
std::cerr << "[response::resume()] latently_deferred_status it->index = "
<< ((*it)->index == deferred_status::ORIGIN ? "ORIGIN" : std::to_string((*it)->index))
<< " deferred = " << (*it)->deferred << " resumed = " << (*it)->resumed << std::endl;
#endif
(*it)->deferred = false; // reset deferred status
(*it)->resumed = false; // reset resumed status
it++;
}
#if STREAMING_LOG
std::cerr << "[response::resume()] calling res.resume()" << std::endl;
#endif
res.resume();
}
else {
// lowest generator is not deferred; deferred status is still latent
#if STREAMING_LOG
std::cerr << "[response::resume()] latently_deferred_status.front()->index = "
<< (latently_deferred_status.front()->index == deferred_status::ORIGIN ? "ORIGIN" : std::to_string(latently_deferred_status.front()->index))
<< " deferred = false" << std::endl;
std::cerr << "[response::resume()] latently_deferred_status.front()->index = "
<< (latently_deferred_status.front()->index == deferred_status::ORIGIN ? "ORIGIN" : std::to_string(latently_deferred_status.front()->index))
<< " setting resumed = true" << std::endl;
#endif
latently_deferred_status.front()->resumed = true;
}
}
else {
// latently_deferred_status is empty
res.resume();
}
#else // !STREAMING_SUPPORT
res.resume();
#endif // !STREAMING_SUPPORT
}
else {
#if STREAMING_SUPPORT
if (!is_reading_body && it != response_filters.end()) {
if (!it->buffered) {
it->buffered = wrap_stream_cb(it->streamed, it->on_header);
}
(it->buffered)(*this); // resume reading from source
}
#else // !STREAMING_SUPPORT
if (!is_reading_body && it != response_filters.cend()) {
(*it)(*this); // resume reading from source
}
#endif // !STREAMING_SUPPORT
else {
read_body(*this);
}
}
}
bool is_header_written;
unsigned int status_code_;
header_map header_;
std::string body; // streaming not supported
bool is_trailer_expected;
header_map trailer_;
bool is_push;
std::string push_method;
std::string push_raw_path_query;
header_map push_header;
std::list<close_cb> on_close_callbacks;
response_cb read_body;
bool is_reading_body;
response_cb respond_body;
#if STREAMING_SUPPORT
bool force_buffered_filtering;
std::list<filter_item> response_filters;
std::list<filter_item>::iterator it;
std::list<std::shared_ptr<deferred_status>> latently_deferred_status;
#else // !STREAMING_SUPPORT
std::list<response_cb> response_filters;
std::list<response_cb>::const_iterator it;
#endif // !STREAMING_SUPPORT
std::list<response_cb> trailer_filters;
std::list<push_cb> push_filters;
std::map<std::string, std::shared_ptr<response>> push_responses;
const h2s::response &res;
std::shared_ptr<middleware_controller> controller;
};
class middleware_controller {
public:
static request_cb handler(std::shared_ptr<middleware_vector> chain) {
if (chain->size() < 1) {
return [](const h2s::request &req_original, const h2s::response &res_original){
res_original.cancel();
};
}
return [chain](const h2s::request &req_original, const h2s::response &res_original){
auto controller = std::make_shared<middleware_controller>(req_original, res_original, chain);
controller->set_controller(controller);
controller->it = controller->request_filters->cbegin();
(*controller->it).cb(controller->req, controller->res);
};
}
public:
middleware_controller(const h2s::request &req_, const h2s::response &res_, std::shared_ptr<middleware_vector> chain)
: req(req_), res(res_), request_filters(chain) {
#if INSTANTIATION_LOG
std::cerr << "Constructed middleware_controller " << req.key.uri_.path << std::endl;
#endif
}
~middleware_controller() {
#if INSTANTIATION_LOG
std::cerr << "Destructed middleware_controller " << req.key.uri_.path << std::endl;
#endif
}
void set_controller(std::shared_ptr<middleware_controller> controller) {
req.controller = controller;
res.controller = controller;
}
void next(uint32_t err = 0, next_type type = next_type::NEXT_APP) {
if (err > 0) {
res.cancel(err);
}
else {
auto index = it->index;
auto next = it->next;
switch (type) {
case next_type::NEXT_APP:
std::advance(it, next - index);
break;
case next_type::FIRST_CHILD:
it++;
break;
case next_type::MATCHING_CHILD:
std::map<std::string, std::map<request_key, middleware_index>> &dispatch_map = (*request_filters)[index].dispatch_map;
#if MATCHING_LOG
std::cerr << "next(MATCHING_CHILD) from " << it->index << ":" << it->path << " for " << req.key.uri_.path << " with prefix " << req.key.uri_.path.substr(0, req.key.prefix_length) << " " << req.key.prefix_length << std::endl;
#endif
auto method_it = dispatch_map.find(req.key.method_);
if (method_it != dispatch_map.end()) {
#if MATCHING_LOG
std::cerr << "next(MATCHING_CHILD) found method " << req.key.method_ << " from " << index << ":" << (*request_filters)[index].path
<< " req.key.prefix_length=" << req.key.prefix_length << " " << req.key.uri_.path.substr(0, req.key.prefix_length) << std::endl;
#endif
auto child_it = method_it->second.find(req.key);
if (child_it != method_it->second.end()) {
next = child_it->second;
#if MATCHING_LOG
std::cerr << "next(MATCHING_CHILD) found matching key " << next << ":" << (*request_filters)[next].path << std::endl;
#endif
req.is_matched = true;
}
else {
#if MATCHING_LOG
std::cerr << "next(MATCHING_CHILD) did not find matching key for " << req.key.uri_.path << std::endl;
#endif
method_it = dispatch_map.find("*");
if (method_it != dispatch_map.end()) {
#if MATCHING_LOG
std::cerr << "next(MATCHING_CHILD) found method " << req.key.method_ << " from " << index << ":" << (*request_filters)[index].path << std::endl;
#endif
child_it = method_it->second.find(req.key);
if (child_it != method_it->second.end()) {
next = child_it->second;
#if MATCHING_LOG
std::cerr << "next(MATCHING_CHILD) found matching key " << next << ":" << (*request_filters)[next].path << std::endl;
#endif
req.is_matched = true;
}
}
}
}
else {
method_it = dispatch_map.find("*");
#if MATCHING_LOG
std::cerr << "next(MATCHING_CHILD) found method " << req.key.method_ << " from " << index << ":" << (*request_filters)[index].path << std::endl;
#endif
if (method_it != dispatch_map.end()) {
auto child_it = method_it->second.find(req.key);
if (child_it != method_it->second.end()) {
next = child_it->second;
#if MATCHING_LOG
std::cerr << "next(MATCHING_CHILD) found matching key " << next << ":" << (*request_filters)[next].path << std::endl;
#endif
req.is_matched = true;
}
}
}
std::advance(it, next - index);
break;
}
if (it != request_filters->cend()) {
req.key.prefix_length = it->prefix.size();
it->cb(req, res);
}
}
}
std::shared_ptr<middleware_vector> request_filters;
middleware_vector::const_iterator it;
request req;
response res;
};
class web_app: public std::enable_shared_from_this<web_app> {
public:
static bool key_less(const request_key &a, const request_key &b) {
std::size_t a_size, b_size;
int cmp;
if (a.type == route_type::REQUEST) {
if (b.type == route_type::REQUEST) {
return a.uri_.path < b.uri_.path;
}
else {
switch (b.type) {
case route_type::PREFIX:
#if MATCHING_LOG
if (a.uri_.path.size() < a.prefix_length) {
std::cerr << a.uri_.path << " a.uri_.path.size() < a.prefix_length " << a.prefix_length << std::endl;
}
#endif
a_size = a.uri_.path.size() - a.prefix_length;
b_size = b.uri_.path.size();
if (b_size < a_size) {
cmp = a.uri_.path.compare(a.prefix_length, b_size,
b.uri_.path, 0, b_size);
if (cmp == 0) {
if (a.uri_.path[a.prefix_length + b_size] == '/') {
// a is a subfolder of b; a matches b
return false; // a < b is false
}
else {
// a is not a subfolder of b; a starts with b; a > b
return false; // true if a < b
}
}
else {
return cmp < 0; // true if a < b
}
}
else if (b_size == a_size) {
cmp = a.uri_.path.compare(a.prefix_length, b_size,
b.uri_.path, 0, b_size);
if (cmp == 0) {
return false; // a == b; a matches b
}
else {
return cmp < 0;
}
}
else { // b_size > a_size
cmp = a.uri_.path.compare(a.prefix_length, a_size,
b.uri_.path, 0, b_size);
return cmp < 0;
}
break;
case route_type::PARAMETERIZED_PREFIX:
#if MATCHING_LOG
if (a.uri_.path.size() < a.prefix_length) {
std::cerr << a.uri_.path << " a.uri_.path.size() < a.prefix_length " << a.prefix_length << std::endl;
}
#endif
a_size = a.uri_.path.size() - a.prefix_length;
b_size = b.prefix_length; // excluding parameters and preceding /
if (b_size < a_size) {
cmp = a.uri_.path.compare(a.prefix_length, b_size,
b.uri_.path, 0, b_size);
if (cmp == 0) {
if (a.uri_.path[a.prefix_length + b_size] == '/') {
// a is a subfolder of b; a matches b
return false; // a < b is false
}
else {
// a is not a subfolder of b; a starts with b; a > b
return false;
}
}
else {
return cmp < 0; // true if a < b
}
}
else if (b_size == a_size) {
cmp = a.uri_.path.compare(a.prefix_length, b_size,
b.uri_.path, 0, b_size);
if (cmp == 0) {
return false; // a == b; a matches b
}
else {
return cmp < 0;
}
}
else { // b_size > a_size
cmp = a.uri_.path.compare(a.prefix_length, a_size,
b.uri_.path, 0, b_size);
return cmp < 0;
}
break;
case route_type::REGEX: // unexpected; any < re
case route_type::ALL: // unexpected; any < *
default:
return true;
}
}
}
else if (b.type == route_type::REQUEST) { // a.type != REQUEST
switch (a.type) {
case route_type::PREFIX:
#if MATCHING_LOG
if (b.uri_.path.size() < b.prefix_length) {
std::cerr << b.uri_.path << " b.uri_.path.size() < b.prefix_length " << b.prefix_length << std::endl;
}
#endif
a_size = a.uri_.path.size();
b_size = b.uri_.path.size() - b.prefix_length;
if (a_size < b_size) {
cmp = b.uri_.path.compare(b.prefix_length, a_size,
a.uri_.path, 0, a_size);
if (cmp == 0) {
if (b.uri_.path[b.prefix_length + a_size] == '/') {
// b is a subfolder of a; b matches a
return false; // a < b is false
}
else {
// b is not a subfolder of a; b starts with a; a < b
return true; // true if a < b
}
}
else {
return cmp > 0; // true if a < b
}
}
else if (b_size == a_size) {
cmp = b.uri_.path.compare(b.prefix_length, a_size,
a.uri_.path, 0, a_size);
if (cmp == 0) {
return false; // a == b; b matches a
}
else {
return cmp > 0;
}
}
else { // a_size > b_size
cmp = b.uri_.path.compare(b.prefix_length, a_size,
a.uri_.path, 0, a_size);
return cmp > 0;
}
break;
case route_type::PARAMETERIZED_PREFIX:
#if MATCHING_LOG
if (b.uri_.path.size() < b.prefix_length) {
std::cerr << b.uri_.path << " b.uri_.path.size() < b.prefix_length " << b.prefix_length << std::endl;
}
#endif
a_size = a.prefix_length; // excluding parameters and preceding /
b_size = b.uri_.path.size() - b.prefix_length;
cmp = b.uri_.path.compare(b.prefix_length, a_size,
a.uri_.path, 0, a_size);
if (a_size < b_size) {
if (cmp == 0) {
if (b.uri_.path[b.prefix_length + a_size] == '/') {
// b is a subfolder of a; b matches a
return false; // a < b is false
}
else {
// b is not a subfolder of a; b starts with a; a < b
return true;
}
}
else {
return cmp > 0; // true if a < b
}
}
else if (a_size == b_size) {
if (cmp == 0) {
return false; // a == b; a matches b
}
else {
return cmp > 0;
}
}
else { // a_size > b_size
return cmp > 0;
}
break;
case route_type::REGEX: // unexpected; any < re
case route_type::ALL: // unexpected; any < *
default:
return false;
}
}
else { // a != REQUEST && b != REQUEST
switch (a.type) {
case route_type::PREFIX:
a_size = a.uri_.path.size();
switch (b.type) {
case route_type::PREFIX:
b_size = b.uri_.path.size();
cmp = a.uri_.path.compare(0, a_size,
b.uri_.path,
0, b_size);
return cmp < 0;
case route_type::PARAMETERIZED_PREFIX:
b_size = b.prefix_length;
cmp = a.uri_.path.compare(0, a_size,
b.uri_.path,
0, b_size);
return cmp < 0;
case route_type::REGEX: // unexpected; any < re
case route_type::ALL: // unexpected; any < *
default:
return true;
}
case route_type::PARAMETERIZED_PREFIX:
a_size = a.prefix_length;
switch (b.type) {
case route_type::PREFIX:
b_size = b.uri_.path.size();
cmp = a.uri_.path.compare(0, a_size,
b.uri_.path,
0, b_size);
return cmp < 0;
case route_type::PARAMETERIZED_PREFIX:
b_size = b.prefix_length;
cmp = a.uri_.path.compare(0, a_size,
b.uri_.path,
0, b_size);
return cmp < 0;
case route_type::REGEX: // unexpected: any < re
case route_type::ALL: // unexpected: any < *
default:
return true;
}
case route_type::REGEX: // unexpected
case route_type::ALL: // unexpected
default:
switch (b.type) {
case route_type::PREFIX: // unexpected: any < re/*
case route_type::PARAMETERIZED_PREFIX: // unexpected: any < re/*
return false;
case route_type::REGEX: // unexpected
case route_type::ALL: // unexpected
default:
return a.uri_.path < b.uri_.path;
}
}
}
};
static std::shared_ptr<web_app> create() {
return std::make_shared<web_app>();
}
static std::shared_ptr<web_app> copy(std::shared_ptr<web_app> original) {
return std::make_shared<web_app>(original)->copy_app_chain(original);
}
public:
web_app()
: type(app_type::APP),
method_("*"),
path_("*"),
app_chain(std::make_shared<std::list<std::shared_ptr<web_app>>>()),
parent_() {
#if INSTANTIATION_LOG
std::cerr << "Constructed web_app " << path_ << std::endl;
#endif
}
web_app(app_type type, std::string method, std::string path, std::shared_ptr<web_app> parent = nullptr)
: type(type),
method_(method),
path_(path),
app_chain(std::make_shared<std::list<std::shared_ptr<web_app>>>()),
parent_(parent) {
#if INSTANTIATION_LOG
std::cerr << "Constructed web_app " << path_ << std::endl;
#endif
}
web_app(std::string method, std::string path, middleware_cb cb, std::shared_ptr<web_app> parent = nullptr)
: type(app_type::RAW_MIDDLEWARE),
method_(method),
path_(path),
cb(cb),
app_chain(std::make_shared<std::list<std::shared_ptr<web_app>>>()),
parent_(parent) {
#if INSTANTIATION_LOG
std::cerr << "Constructed web_app " << path_ << std::endl;
#endif
}
web_app(const std::shared_ptr<web_app> original)
: type(original->type),
method_(original->method_),
path_(original->path_),
cb(original->cb),
app_chain(),
parent_() {
#if INSTANTIATION_LOG
std::cerr << "Constructed web_app " << path_ << " by copying" << std::endl;
#endif
}
~web_app() {
#if INSTANTIATION_LOG
std::cerr << "Destructed web_app " << path_ << std::endl;
#endif
}
std::shared_ptr<web_app> ptr() {
return shared_from_this();
}
std::shared_ptr<web_app> copy_app_chain(const std::shared_ptr<web_app> original) {
app_chain = std::make_shared<std::list<std::shared_ptr<web_app>>>();
auto it = original->app_chain->cbegin();
while (it != original->app_chain->cend()) {
auto clone = std::make_shared<web_app>(*it);
clone->copy_app_chain(*it);
clone->parent_ = shared_from_this();
app_chain->push_back(clone);
it++;
}
return shared_from_this();
}
std::size_t get_prefix_length() {
auto param_it = path_.find("/:");
std::size_t prefix_length__ = 0;
if (param_it != std::string::npos) {
prefix_length__ = param_it;
}
else if (path_.find("(") != std::string::npos) { // TODO ( is too ambiguous
prefix_length__ = path_.size(); // TODO:
}
else if (path_ == "*") {
prefix_length__ = 0;
}
else {
prefix_length__ = path_.size();
}
return prefix_length__;
}
void set_prefix(std::string parent_prefix) {
prefix = parent_prefix;
std::string my_prefix;
auto it = app_chain->begin();
switch (type) {
case app_type::APP:
case app_type::DISPATCHER:
my_prefix = parent_prefix + path_.substr(0, get_prefix_length());
while (it != app_chain->end()) {
(*it)->set_prefix(my_prefix);
it++;
}
break;
case app_type::RAW_MIDDLEWARE:
default:
break;
}
}
operator request_cb() {
if (type == app_type::APP) {
if (path_.size() > 0 && path_[path_.size() - 1] == '/') {
path_ = path_.substr(0, path_.size() - 1); // remove trailing / from the mount point path_
}
}
set_prefix(path_);
auto chain = (std::shared_ptr<middleware_chain>)*this;
if (type == app_type::APP) {
chain->pop_front(); // pop self app;
}
auto it = chain->begin();
middleware_index index = 0;
while (it != chain->end()) {
it->index = index;
it->next = it->index + it->next;
it++;
index++;
}
struct quadple {
app_type type;
middleware_index index;
middleware_index original_next;
middleware_index updated_next;
};
std::list<quadple> parent_type_stack;
parent_type_stack.push_front(quadple{ app_type::APP, chain->size(), chain->size(), chain->size() });
it = chain->begin();
while (it != chain->end()) {
auto next_it = it;
next_it++;
it->parent_type = parent_type_stack.front().type;
it->parent_index = parent_type_stack.front().index;
it->original_next = it->next;
it->parent_next = parent_type_stack.front().updated_next;
std::list<quadple>::iterator parent_type_stack_it;
switch (it->parent_type) {
case app_type::DISPATCHER:
it->next = it->parent_next;
parent_type_stack_it = parent_type_stack.begin();
while (parent_type_stack_it != parent_type_stack.end() && parent_type_stack_it->index < chain->size()) {
parent_type_stack_it++;
if (parent_type_stack_it->index >= it->next) {
break;
}
}
break;
case app_type::APP:
if (next_it == chain->end() || next_it->index >= parent_type_stack.front().original_next) {
it->next = it->parent_next;
}
if (it->next == it->parent_next) {
parent_type_stack_it = parent_type_stack.begin();
while (parent_type_stack_it != parent_type_stack.end() && parent_type_stack_it->index < chain->size()) {
parent_type_stack_it++;
if (parent_type_stack_it->index >= it->next) {
break;
}
}
}
break;
case app_type::RAW_MIDDLEWARE:
break;
default:
break;
}
switch (it->type) {
case app_type::APP:
case app_type::DISPATCHER:
parent_type_stack.push_front(quadple{ it->type, it->index, it->original_next, it->next });
break;
case app_type::RAW_MIDDLEWARE:
default:
break;
}
while (parent_type_stack.size() > 0 && it->index + 1 >= parent_type_stack.front().original_next) {
parent_type_stack.pop_front();
}
it++;
}
auto vector_ = std::make_shared<middleware_vector>(chain->begin(), chain->end());
for (middleware_index index = 0; index < vector_->size(); index++) {
if ((*vector_)[index].parent_index < vector_->size()) {
(*vector_)[(*vector_)[index].parent_index].children.push_back(index);
}
auto param_it = (*vector_)[index].path.find("/:");
std::size_t prefix_length = 0;
route_type r_type;
if (param_it != std::string::npos) {
r_type = route_type::PARAMETERIZED_PREFIX;
prefix_length = param_it;
}
else if ((*vector_)[index].path.find("(") != std::string::npos) { // TODO ( is too ambiguous
r_type = route_type::REGEX;
}
else if ((*vector_)[index].path == "*") {
r_type = route_type::ALL;
}
else {
r_type = route_type::PREFIX;
}
(*vector_)[index].r_type = r_type;
(*vector_)[index].prefix_length = prefix_length;
}
for (middleware_index index = 0; index < vector_->size(); index++) {
if ((*vector_)[index].type == app_type::DISPATCHER) {
std::for_each((*vector_)[index].children.cbegin(), (*vector_)[index].children.cend(), [vector_, index](middleware_index child_it) {
auto method_it = (*vector_)[index].dispatch_map.find((*vector_)[child_it].method);
if (method_it == (*vector_)[index].dispatch_map.end()) {
std::map<request_key, middleware_index> method_map;
(*vector_)[index].dispatch_map.emplace((*vector_)[child_it].method, std::move(method_map));
}
method_it = (*vector_)[index].dispatch_map.find((*vector_)[child_it].method);
if (method_it == (*vector_)[index].dispatch_map.end()) {
std::cerr << "failed to emplace in dispatch_map" << std::endl;
return;
}
std::size_t prefix_length = (*vector_)[child_it].prefix_length;
route_type r_type = (*vector_)[child_it].r_type;
if (r_type == route_type::PREFIX || r_type == route_type::PARAMETERIZED_PREFIX) {
method_it->second.emplace(request_key{
(*vector_)[child_it].method,
uri_ref{
"https",
"localhost",
(*vector_)[child_it].path,
(*vector_)[child_it].path,
"",
""
},
prefix_length,
r_type
},
(*vector_)[child_it].index
);
}
else {
// TODO: ALL and REGEX
}
});
auto all_method_it = (*vector_)[index].dispatch_map.find("*");
if (all_method_it != (*vector_)[index].dispatch_map.end()) {
std::for_each(all_method_it->second.begin(), all_method_it->second.end(), [vector_, index](std::pair<request_key, middleware_index> child_it) {
std::for_each((*vector_)[index].dispatch_map.begin(), (*vector_)[index].dispatch_map.end(),
[vector_, index, &child_it](std::pair<std::string, std::map<request_key, middleware_index>> method_it_){
if (method_it_.first != "*") {
//std::cerr << "emplacing " << child_it.first.uri_.path << " into " << method_it_.first << std::endl;
auto method_it__ = (*vector_)[index].dispatch_map.find(method_it_.first);
method_it__->second.emplace(request_key{
method_it_.first,
child_it.first.uri_,
child_it.first.prefix_length,
child_it.first.type
},
child_it.second
);
}
});
});
}
}
}
std::ostringstream oss;
std::for_each(vector_->cbegin(), vector_->cend(), [&oss](middleware_item item) {
oss << "middleware_item { index: " << item.index
<< ", method: " << item.method
<< ", prefix: " << item.prefix
<< ", path: " << item.path
<< ", next: " << item.next
<< ", type:" << [](app_type type)->std::string {
switch (type) {
case app_type::APP: return "APP";
case app_type::DISPATCHER: return "DISPATCHER";
case app_type::RAW_MIDDLEWARE: return "RAW_MIDDLEWARE";
default: return std::to_string((int)type);
}}(item.type)
<< ", parent_type:" << [](app_type type)->std::string {
switch (type) {
case app_type::APP: return "APP";
case app_type::DISPATCHER: return "DISPATCHER";
case app_type::RAW_MIDDLEWARE: return "RAW_MIDDLEWARE";
default: return std::to_string((int)type);
}}(item.parent_type)
<< ", parent_index: " << item.parent_index
<< ", parent_next: " << item.parent_next
<< ", children ";
bool first = true;
oss << "[ ";
std::for_each(item.children.cbegin(), item.children.cend(), [&first, &oss](middleware_index child){
if (first) {
first = false;
}
else {
oss << ", ";
}
oss << child;
});
oss << " ] ";
first = true;
oss << ", dispatch_map [ ";
std::for_each(item.dispatch_map.cbegin(), item.dispatch_map.cend(), [&first, &oss](std::pair<std::string, std::map<request_key, middleware_index>> method_it){
if (first) {
first = false;
}
else {
oss << ", ";
}
oss << method_it.first << ": [ ";
auto first_ = true;
std::for_each(method_it.second.cbegin(), method_it.second.cend(), [&first_, &oss](std::pair<request_key, middleware_index> route_it){
if (first_) {
first_ = false;
}
else {
oss << ", ";
}
oss << route_it.first.uri_.path;
});
oss << " ] ";
});
oss << " ] ";
oss << "}" << std::endl;
});
#ifdef DUMP_MIDDLEWARE_ITEM
std::cerr << oss.str();
#endif
return middleware_controller::handler(vector_);
}
operator std::shared_ptr<middleware_chain>() {
std::shared_ptr<middleware_chain> chain;
switch (type) {
case app_type::RAW_MIDDLEWARE:
chain = std::make_shared<middleware_chain>();
chain->push_back(middleware_item{ 0, prefix, method_, path_, generate_raw_middleware(), 1, type });
return chain;
case app_type::DISPATCHER:
return generate_dispatch_middleware();
case app_type::APP:
return generate_app_middleware();
default:
chain = nullptr;
return chain;
}
}
std::shared_ptr<middleware_chain> generate_app_middleware() {
auto chain = std::make_shared<middleware_chain>();
auto it = app_chain->cbegin();
auto cend = app_chain->cend();
middleware_cb app_cb = [this](request &req, response &res){
if (req.match()) {
#if MATCHING_LOG
std::cerr << "app_cb " << "req " << req.uri().path << " matched " << req.controller->it->index << ":" << req.controller->it->path << " with prefix " << req.uri().path.substr(0, req.key.prefix_length) << " " << req.key.prefix_length << std::endl;
#endif
req.next(0, next_type::FIRST_CHILD);
}
else {
#if MATCHING_LOG
std::cerr << "app_cb " << "req " << req.uri().path << " did not match " << req.controller->it->index << ":" << req.controller->it->path << " " << req.key.prefix_length << std::endl;
#endif
req.next();
}
};
cb = app_cb;
while (it != cend) {
auto subchain = (std::shared_ptr<middleware_chain>)(**it);
for (auto it_sub = subchain->cbegin(); it_sub != subchain->cend(); it_sub++) {
chain->push_back(*it_sub);
}
it++;
}
chain->push_front(middleware_item{ 0, prefix, method_, path_, cb, chain->size() + 1 /* add self */, type });
return chain;
}
std::shared_ptr<middleware_chain> generate_dispatch_middleware() {
auto chain = std::make_shared<middleware_chain>();
auto it = app_chain->cbegin();
auto cend = app_chain->cend();
middleware_cb dispatch_cb = [this](request &req, response &res){
if (req.match()) {
req.key.prefix_length = (*req.controller->request_filters)[req.controller->it->index + 1].prefix.size();
#if MATCHING_LOG
std::cerr << "dispatch_cb " << "req " << req.uri().path << " matched " << req.controller->it->index << ":" << req.controller->it->path << " with prefix " << req.uri().path.substr(0, req.key.prefix_length) << " " << req.key.prefix_length << std::endl;
#endif
req.next(0, next_type::MATCHING_CHILD);
}
else {
#if MATCHING_LOG
std::cerr << "dispatch_cb " << "req " << req.uri().path << " did not match " << req.controller->it->index << ":" << req.controller->it->path << " " << req.key.prefix_length << std::endl;
#endif
req.next();
}
};
cb = dispatch_cb;
while (it != cend) {
auto subchain = (std::shared_ptr<middleware_chain>)(**it);
for (auto it_sub = subchain->cbegin(); it_sub != subchain->cend(); it_sub++) {
chain->push_back(*it_sub);
}
it++;
}
chain->push_front(middleware_item{ 0, prefix, method_, path_, cb, chain->size() + 1 /* add self */, type });
return chain;
}
middleware_cb generate_raw_middleware() {
if (false && method_ == "*" && path_ == "*") {
return cb;
}
else {
return [this](request &req, response &res) {
if (req.match()) { // req.is_matched is reset, req.key.prefix_length is updated
#if MATCHING_LOG
std::cerr << "raw_middleware_cb " << "req " << req.uri().path << " matched " << req.controller->it->index << ":" << req.controller->it->path << " with prefix " << req.uri().path.substr(0, req.key.prefix_length) << " " << req.key.prefix_length << std::endl;
#endif
this->cb(req, res);
}
else {
#if MATCHING_LOG
std::cerr << "raw_middleware_cb " << "req " << req.uri().path << " did not match " << req.controller->it->index << ":" << req.controller->it->path << " " << req.key.prefix_length << std::endl;
#endif
req.next();
}
};
}
}
web_app &use(std::shared_ptr<web_app> app) {
app->parent_ = shared_from_this();
app_chain->push_back(app);
return *this;
}
web_app &use(middleware_cb cb) {
auto app = std::make_shared<web_app>("*", "*", cb);
app->parent_ = shared_from_this();
app_chain->push_back(app);
return *this;
}
web_app &use(std::string path, std::shared_ptr<web_app> app) {
app->path_ = path;
app->parent_ = shared_from_this();
app_chain->push_back(app);
return *this;
}
web_app &use(std::string path, middleware_cb cb) {
auto app = std::make_shared<web_app>("*", path, cb);
app->parent_ = shared_from_this();
app_chain->push_back(app);
return *this;
}
web_app &all(middleware_cb cb) {
return use(cb);
}
web_app &all(std::string path = "*") {
auto app = std::make_shared<web_app>(app_type::APP, "*", path);
app->parent_ = shared_from_this();
app_chain->push_back(app);
return *app;
}
web_app &all(std::shared_ptr<web_app> app) {
return use(app);
}
web_app &all(std::string path, middleware_cb cb) {
return use(path, cb);
}
web_app &all(std::string path, std::shared_ptr<web_app> app) {
return use(path, app);
}
web_app &get(std::string path, std::shared_ptr<web_app> app) {
app->method_ = "GET";
app->path_ = path;
app->parent_ = shared_from_this();
app_chain->push_back(app);
return *this;
}
web_app &get(std::string path, middleware_cb cb) {
auto app = std::make_shared<web_app>("GET", path, cb);
app->parent_ = shared_from_this();
app_chain->push_back(app);
return *this;
}
web_app &get(std::string path = "*") {
auto app = std::make_shared<web_app>(app_type::APP, "GET", path);
app->parent_ = shared_from_this();
app_chain->push_back(app);
return *app;
}
web_app &post(std::string path, std::shared_ptr<web_app> app) {
app->method_ = "POST";
app->path_ = path;
app->parent_ = shared_from_this();
app_chain->push_back(app);
return *this;
}
web_app &post(std::string path, middleware_cb cb) {
auto app = std::make_shared<web_app>("POST", path, cb);
app->parent_ = shared_from_this();
app_chain->push_back(app);
return *this;
}
web_app &post(std::string path = "*") {
auto app = std::make_shared<web_app>(app_type::APP, "POST", path);
app->parent_ = shared_from_this();
app_chain->push_back(app);
return *app;
}
web_app &dispatch(std::string path = "*") {
auto app = std::make_shared<web_app>(app_type::DISPATCHER, "*", path);
app->parent_ = shared_from_this();
app_chain->push_back(app);
return *app;
}
web_app &parent() {
if (!parent_.expired()) {
auto parent_ptr = parent_.lock();
return *parent_ptr;
}
else {
return *this; // no parent
}
}
web_app &mount(std::string mount_point, http2 &server) {
path_ = mount_point;
server.handle(path_, (request_cb)*this);
return *this;
}
std::string path() {
return path_;
}
public:
app_type type;
std::string path_;
std::string method_;
middleware_cb cb;
std::shared_ptr<std::list<std::shared_ptr<web_app>>> app_chain;
std::string prefix;
std::weak_ptr<web_app> parent_;
};
inline void request::next(uint32_t err, next_type type) {
controller->next(err, type);
}
inline bool request::match() { // side effects: update req.key.prefix_length on matching
if (is_matched) {
#if MATCHING_LOG
std::cerr << key.uri_.path << " request::match() resetting is_matched = true as false; prefix_length = " << key.prefix_length << ":" << key.uri_.path.substr(0, key.prefix_length) << std::endl;
#endif
is_matched = false; // reset is_matched flag
return true; // matched on dispatching
}
else {
const middleware_item &item = *(controller->it);
if (item.method == "*" || item.method == key.method_) {
// method matched
const std::string &route = item.path;
const std::string &path = key.uri_.path;
std::size_t route_size, key_size;
switch (item.r_type) {
case route_type::ALL: // *
// no req.key.prefix_length update
return true;
case route_type::PREFIX: // /routepath
#if MATCHING_LOG
if (path.size() < key.prefix_length) {
std::cerr << path << " " << "path.size() < key.prefix_length" << " prefix_length = " << key.prefix_length << std::endl;
}
#endif
key_size = path.size() - key.prefix_length;
route_size = route.size();
if (route_size > key_size) {
return false; // /shortpath never matches /long/routepath
}
else {
// route_size <= key_size
if (path.compare(key.prefix_length, route_size, route, 0, route_size) == 0) {
if (route_size == key_size) {
// exact match
switch (item.type) {
case app_type::APP:
case app_type::DISPATCHER:
key.prefix_length += route_size;
break;
case app_type::RAW_MIDDLEWARE:
default:
key.prefix_length += route_size;
break;
}
return true;
}
else {
// route_size < key_size
if (path[key.prefix_length + route_size] == '/') {
// path is a subfolder of route; path matches route
switch (item.type) {
case app_type::APP:
case app_type::DISPATCHER:
key.prefix_length += route_size;
break;
case app_type::RAW_MIDDLEWARE:
default:
key.prefix_length += route_size;
break;
}
return true;
}
else {
// path is not a subfolder of route; path starts with route; path does not match route
return false;
}
}
}
else {
return false; // path does not start with route
}
}
break;
case route_type::PARAMETERIZED_PREFIX: // /routepath/:param1/:param2
key_size = path.size() - key.prefix_length;
route_size = item.prefix_length;
if (route_size > key_size) {
return false; // /shortpath never matches /long/routepath
}
else {
// route_size <= key_size
if (path.compare(key.prefix_length, route_size, route, 0, route_size) == 0) {
if (route_size == key_size) {
// missing parameters
return false;
}
else {
// route_size < key_size
if (path[key.prefix_length + route_size] == '/') {
// path is a subfolder of route; path matches route
// parameters start at path + key.prefix_length + route_size + 1
switch (item.type) {
case app_type::APP:
case app_type::DISPATCHER:
key.prefix_length += route_size; // points at '/' followed by parameters
break;
case app_type::RAW_MIDDLEWARE:
default:
key.prefix_length += route_size;
break;
}
return true;
}
else {
// path is not a subfolder of route; path starts with route; path does not match route
return false;
}
}
}
else {
// path does not start with route
return false;
}
}
break;
case route_type::REGEX: // TODO
return false;
case route_type::REQUEST: // unexpected route_type in item
default:
return false;
}
}
else {
return false; // method not matched
}
}
}
inline void response::write_head(unsigned int status_code, header_map h) {
status_code_ = status_code;
header_ = std::move(h);
if (!expect_response_body(controller->req.method(), status_code_)) {
is_header_written = true;
res.write_head(status_code_, header_);
}
else {
// calling res.write_head() is pending
}
}
inline void response::end(std::string data) {
#if STREAMING_SUPPORT
#if STREAMING_LOG
std::cerr << "[response::end(std::string data)]: response_filters.size() = " << response_filters.size() << std::endl;
#endif
if (!force_buffered_filtering && response_filters.size() > 0) {
// streaming
#if STREAMING_LOG
std::cerr << "[response::end(std::string data)] streaming with data = " << data << std::endl;
#endif
auto left_ = std::make_shared<std::size_t>(data.size());
auto data_ = std::make_shared<std::string>(std::move(data));
end([data_, left_](uint8_t *buf, size_t len, uint32_t *data_flags) {
auto bytes = std::min(len, *left_);
std::copy_n(data_->data() + data_->size() - *left_, bytes, buf);
*left_ -= bytes;
if (*left_ == 0) {
*data_flags |= NGHTTP2_DATA_FLAG_EOF;
}
return bytes;
});
return;
}
#if STREAMING_LOG
std::cerr << "[response::end(std::string data)] non-streaming with data = " << data << std::endl;
#endif
#endif // STREAMING_SUPPORT
body = data;
respond_body = [](response &res){
if (!res.is_header_written) {
res.is_header_written = true;
#if STREAMING_LOG
std::cerr << "[response::end(std::string data) respond_body callback] res.res.write_head(); status_code = " << res.status_code() << std::endl;
#endif
res.res.write_head(res.status_code(), res.header());
}
#if STREAMING_LOG
std::cerr << "[response::end(std::string data) respond_body callback] res.res.end(res.body); res.body = " << res.body << std::endl;
#endif
res.res.end(res.body);
};
#if STREAMING_SUPPORT
it = response_filters.begin();
if (it != response_filters.end()) {
#if STREAMING_LOG
std::cerr << "[response::end(std::string data)] calling response_filters " << std::endl;
#endif
if (!it->buffered) {
it->buffered = wrap_stream_cb(it->streamed, it->on_header);
}
(it->buffered)(*this);
}
#else // !STREAMING_SUPPORT
it = response_filters.cbegin();
if (it != response_filters.cend()) {
(*it)(*this);
}
#endif // !STREAMING_SUPPORT
else {
#if STREAMING_LOG
std::cerr << "[response::end(std::string data)] calling respond_body " << std::endl;
#endif
respond_body(*this);
}
}
inline response_cb response::start_response() {
return [](response &res){
#if STREAMING_LOG
std::cerr << "[response::start_response() callback] called" << std::endl;
#endif
if (!res.is_header_written) {
res.is_header_written = true;
res.res.write_head(res.status_code(), res.header());
}
auto left_bytes = std::make_shared<std::size_t>(res.body.size());
res.res.end([&res, left_bytes](uint8_t *buf, std::size_t len, uint32_t *data_flags) {
auto written_bytes = std::min(len, *left_bytes);
std::copy_n(res.body.data() + res.body.size() - *left_bytes, written_bytes, buf);
*left_bytes -= written_bytes;
if (*left_bytes <= 0) {
*data_flags |= NGHTTP2_DATA_FLAG_EOF;
if (res.is_trailer_expected) {
*data_flags |= NGHTTP2_DATA_FLAG_NO_END_STREAM;
res.do_write_trailer();
}
}
return written_bytes;
});
};
}
#if STREAMING_SUPPORT
inline void response::on_response(response_cb cb) {
#if STREAMING_LOG
std::cerr << "[response::on_response(response_cb)]: response_filters.size() = " << response_filters.size() << std::endl;
std::cerr << "[response::on_response(response_cb)]: response_filters.push_front(filter_item{index, cb})" << std::endl;
#endif
response_filters.push_front(std::move(filter_item{this->controller->it->index, cb}));
force_buffered_filtering = true;
}
inline void response::on_response(response_cb on_header, stream_cb cb) {
#if STREAMING_LOG
std::cerr << "[response::on_response(stream_cb)]: response_filters.size() = " << response_filters.size() << std::endl;
std::cerr << "[response::on_response(stream_cb)]: response_filters.push_front(filter_item{index, cb, on_header})" << std::endl;
#endif
response_filters.push_front(std::move(filter_item{this->controller->it->index, cb, on_header}));
}
#endif // STREAMING_SUPPORT
inline void response::setup_on_close() {
res.on_close([this](uint32_t err){
for (auto cb: on_close_callbacks) {
cb(err); // close callbacks are called in the reversed order of their registrations
}
if (this->controller) {
if (!this->is_push) {
this->controller->req.controller = nullptr;
}
this->controller = nullptr;
}
});
}
inline bool operator<(const request_key &a, const request_key &b) {
return web_app::key_less(a, b);
}
#if STREAMING_SUPPORT
#if GENERATOR_STREAM
class generator_stream;
/*
Helper classes for streaming filters:
Notes on streaming architecture:
- generator_cb (std::function<ssize_t (uint8_t *buf, std::size_t len, uint32_t *data_flags)>) as data source and sink
- Wrapped as generator_cb to be consumed by sink like this (in a rough conceptual depiction):
generator_cb sink() { return filter(generator_cb source); } - nested calls to source generator_cb
generator_stream::filter(buf, len, data_flags) can serve as this filter() function above
- The source end is cb in res.end(generator_cb cb)
- The sink end is the nghttp2 engine's call to generator_cb after res.end(generator_cb)
nghttp2 sink -(call)-> filterN -> filterN-1 -> ... -> filter1 -> source generator_cb
HTTP2 engine <=(data)= filterN <= filterN-1 <= ... <= filter1 <= data from generator_cb
- Back-pressure from sink is achieved by just skipping calls to the filterN callback
- NGHTTP2_ERR_DEFERRED can propagate from source through sink with latency
- Latency of deferred status propagation is controlled by wrapping filters in backyard (see response::end())
class generator_stream: std::iostream on generator_streambuf
- Option #1: Class can be derived to create a custom filter stream with overridden do_*() methods
- Option #1-2: Use boost_filtering_streambuf_adaptor (derived class defined below) as an adaptor
- Option #1-3: Add callback lambdas as member variables and call them from do_*() in a derived class,
which should be conceptually the same as class derivation (with more overheads)
- Option #2: Methods (readsome(), check_status(), ...) can be called for input buffering
- If no resizing nor output buffering is required, this can be the most efficient scheme
- Option #3: Do not use generator_stream if no buffering nor resizing is required in filtering
- Input buffer == Output buffer == Filtering workspace
class generator_streambuf: std::streambuf buffer for generator_stream
- This streambuf is customized in a non-standard way to call underflow() prematurely
to fill the input buffer as much as possible non-blockingly
methods: readsome() - read data from input buffer non-blockingly
can be read to [buf, len) (output buffer) directly
check_status(data_flags) - returns nghttp2 error code if required
sink <= |output buffer| <= (filter) <= |input buffer| <= source
write() do_peek() - peek input buffer for a next chunk (filterable unit)
operator << do_filter() - filter a chunk and write to output buffer
do_close() - flush filter buffer (if exists) to output buffer on EOF
override do_*() to build a custom filter
Input buffer:
_M_gbuf (std::string)
|read |buffered |empty |
pback() gptr() egptr() _M_gbuf[_M_gbuf.size() - 1]
^ ^
read from here fill from source (generator_cb)
Output buffer:
non-overflowing mode: output buffer [buf, buf + len) is being filled
generator_cb buffer overflow buffer
|written | | + | |
buf pptr() buf+len _M_pbuf_overflow (std::string)
=_M_pbuf ^
=pbase() write from here
overflowing mode: [buf, buf + len) is returned to the caller sink (generator_cb) on return
generator_cb buffer overflow buffer (expands on demand)
|written (filled) | + |written | empty |
buf=_M_pbuf buf+len _M_pbuf_overflow (std::string)
^ ^ ^
pbase() pptr() epptr()
write from here
overflowing mode before setbuf() and after flush():
_M_pbuf = nullptr
overflow buffer (expands on demand)
|written | empty |
_M_pbuf_overflow (std::string)
^ ^ ^
pbase() pptr() epptr()
write from here
*/
class generator_streambuf: public std::streambuf {
public:
friend class generator_stream;
// for reading and writing
generator_streambuf(generator_cb generator, std::streamsize buf_size = 16384)
: _M_mode(std::ios::in | std::ios::out),
_M_generator(generator),
_M_buf_size(buf_size),
_M_state(STATE_INITIAL),
_M_gbuf(buf_size, '\0'),
_M_pbuf(nullptr),
_M_pbuf_size(0),
_M_pbuf_overflow(buf_size, '\0') {
#if STREAMING_LOG
std::cerr << "[generator_streambuf() for reading and writing] buf_size = " << buf_size << std::endl;
std::cerr << "[generator_streambuf() for reading and writing] _M_gbuf.size() = " << _M_gbuf.size() << std::endl;
#endif
auto gbuf_begin = &_M_gbuf[0];
auto gbuf_end = gbuf_begin + _M_gbuf.size();
setg(gbuf_begin, gbuf_end, gbuf_end); // underflow state
#if STREAMING_LOG
std::cerr << "[generator_streambuf() for reading and writing] overflow buf_size = " << buf_size << std::endl;
#endif
setp(&_M_pbuf_overflow[0], &_M_pbuf_overflow[0] + _M_pbuf_overflow.size()); // empty
}
typedef char *char_ptr;
std::string &peek(char_ptr &_gptr, char_ptr &_egptr) {
_gptr = gptr();
_egptr = egptr();
return _M_gbuf;
}
std::streamsize buf_size() {
return _M_buf_size;
}
bool is_overflowing() {
return pbase() == &_M_pbuf_overflow[0];
}
std::streamsize out_avail() { // excluding overflow buffer
if (is_overflowing()) {
return _M_pbuf_size;
}
else {
return pptr() - pbase();
}
}
std::streamsize out_avail_including_overflow() { // including overflow buffer
if (is_overflowing()) {
return _M_pbuf_size + (pptr() - pbase());
}
else {
return pptr() - pbase();
}
}
enum generator_state {
STATE_INITIAL,
STATE_ERROR,
STATE_EOF,
STATE_OPEN,
STATE_DEFER
};
generator_state state() {
return _M_state;
}
protected:
generator_cb _M_generator;
std::streamsize _M_buf_size;
std::ios_base::openmode _M_mode;
generator_state _M_state;
std::string _M_gbuf;
char *_M_pbuf; // not the owner of the buffer; possibly pointing within _M_gbuf.data() of the next generator_streambuf object in the filter chain
std::streamsize _M_pbuf_size;
std::string _M_pbuf_overflow;
virtual std::streampos seekoff(
std::streamoff off,
std::ios_base::seekdir way,
std::ios_base::openmode which = std::ios_base::in | std::ios_base::out) {
if (which & std::ios_base::in) {
if (way == std::ios_base::cur) {
auto next_gptr = gptr() + off;
if (next_gptr <= egptr()) {
setg(eback(), next_gptr, egptr());
return gptr() - eback(); // dummy offset value
}
}
}
if (which & std::ios_base::out) {
// not implemented
}
return (pos_type(off_type(EOF)));
}
virtual std::streamsize showmanyc() {
std::streamsize _ret = 0;
switch (_M_state) {
case STATE_ERROR:
_ret = -1;
break;
case STATE_EOF:
_ret = egptr() - gptr();
if (_ret <= 0) {
_ret = -1;
}
break;
case STATE_INITIAL:
case STATE_OPEN:
underflow(); // non-standard way of buffering to handle non-blocking I/O for generator
switch (_M_state) {
case STATE_ERROR:
_ret = -1;
break;
case STATE_EOF:
_ret = egptr() - gptr();
if (_ret <= 0) {
_ret = -1;
}
break;
case STATE_INITIAL:
case STATE_OPEN:
_ret = egptr() - gptr();
break;
case STATE_DEFER:
_ret = egptr() - gptr();
if (_ret == 0) {
_M_state = STATE_OPEN;
}
break;
default:
_ret = 0;
break;
}
break;
case STATE_DEFER:
_ret = egptr() - gptr();
if (_ret == 0) {
_M_state = STATE_OPEN;
}
break;
default:
_ret = 0;
break;
}
#if STREAMING_LOG
std::cerr << "[generator_streambuf::showmanyc()] _ret = " << _ret << std::endl;
#endif
return _ret;
}
virtual int_type underflow() {
#if STREAMING_LOG
std::cerr << "[generator_streambuf::underflow()] " << std::endl;
std::string st;
switch (_M_state) {
case STATE_INITIAL: st = "INITIAL"; break;
case STATE_ERROR: st = "ERROR"; break;
case STATE_EOF: st = "EOF"; break;
case STATE_OPEN: st = "OPEN"; break;
case STATE_DEFER: st = "DEFER"; break;
default: st = "unknown"; break;
}
std::cerr << "[generator_streambuf::underflow()] _M_state = " << st << std::endl;
#endif
int_type _ret = traits_type::eof();
if (_M_state == STATE_ERROR) {
return _ret;
}
auto curr_ptr = gptr();
if (_M_state == STATE_EOF) {
if (!curr_ptr || curr_ptr >= egptr()) {
// no more readable data in buffer
#if STREAMING_LOG
std::cerr << "[generator_streambuf::underflow()] no more readable data in buffer " << std::endl;
#endif
_ret = traits_type::eof();
}
else {
#if STREAMING_LOG
std::cerr << "[generator_streambuf::underflow()] readable data remain in buffer : " << (egptr() - curr_ptr) << std::endl;
#endif
_ret = traits_type::to_int_type(*curr_ptr); // readable data remain in buffer
}
}
else {
auto gbuf_begin = &_M_gbuf[0];
auto gbuf_end = gbuf_begin + _M_gbuf.size();
if (!curr_ptr || curr_ptr >= gbuf_begin) {
if (!curr_ptr) {
curr_ptr = gbuf_begin;
setg(gbuf_begin, curr_ptr, curr_ptr);
}
if (gbuf_end <= curr_ptr) {
curr_ptr = gbuf_begin; // rewind the read ptr
setg(gbuf_begin, curr_ptr, curr_ptr);
}
else if (gbuf_begin < curr_ptr) {
auto bytes_in_buffer = egptr() - curr_ptr;
std::copy_n(curr_ptr, bytes_in_buffer, gbuf_begin); // move the remaining bytes to gbuf_begin
curr_ptr = gbuf_begin;
setg(gbuf_begin, curr_ptr, curr_ptr + bytes_in_buffer);
#if STREAMING_LOG
std::cerr << "[generator_streambuf::underflow()] move remaining bytes to gbuf_begin; bytes = " << bytes_in_buffer << std::endl;
#endif
}
#if STREAMING_LOG
if (_M_state == STATE_DEFER)
std::cerr << "[generator_streambuf::underflow()] state updated from DEFER to OPEN " << std::endl;
#endif
_M_state = STATE_OPEN;
std::size_t buf_rest = gbuf_end - egptr();
#if STREAMING_LOG
std::cerr << "[generator_streambuf::underflow()] _M_gbuf.size() = " << _M_gbuf.size() << std::endl;
std::cerr << "[generator_streambuf::underflow()] buf_rest = " << buf_rest << std::endl;
#endif
uint32_t data_flags = 0;
ssize_t bytes;
auto append_ptr = egptr();
// non-blockingly fill buffer from generator
while (buf_rest > 0 && (bytes = _M_generator((uint8_t *)append_ptr, buf_rest, &data_flags)) > 0) {
#if STREAMING_LOG
std::cerr << "[generator_streambuf::underflow()] _M_generator bytes = " << bytes << std::endl;
std::cerr << "[generator_streambuf::underflow()] _M_generator buf_rest = " << buf_rest << std::endl;
#endif
append_ptr += bytes;
buf_rest -= bytes;
setg(gbuf_begin, curr_ptr, append_ptr);
data_flags = 0;
#if STREAMING_LOG
std::cerr << "[generator_streambuf::underflow()] append_ptr - curr_ptr = " << (append_ptr - curr_ptr) << std::endl;
#endif
}
if (bytes == 0 || buf_rest == 0) {
if (data_flags & NGHTTP2_DATA_FLAG_EOF) {
// EOF
#if STREAMING_LOG
std::cerr << "[generator_streambuf::underflow()] EOF data_flags = " << data_flags << std::endl;
#endif
_M_state = STATE_EOF;
}
auto readable_bytes = append_ptr - curr_ptr;
#if STREAMING_LOG
std::cerr << "[generator_streambuf::underflow()] out of generator read loop; append_ptr - curr_ptr = " << readable_bytes << std::endl;
#endif
setg(gbuf_begin, curr_ptr, append_ptr);
if (readable_bytes > 0) {
_ret = traits_type::to_int_type(*curr_ptr);
}
}
else if (bytes < 0) {
if (bytes == NGHTTP2_ERR_DEFERRED) {
if (append_ptr - curr_ptr > 0) {
_ret = traits_type::to_int_type(*curr_ptr);
}
#if STREAMING_LOG
if (_M_state == STATE_OPEN)
std::cerr << "[generator_streambuf::underflow()] state updated from OPEN to DEFER" << std::endl;
#endif
_M_state = STATE_DEFER;
}
else { // errors
if (append_ptr - curr_ptr > 0) {
_ret = traits_type::to_int_type(*curr_ptr);
}
_M_state = STATE_ERROR;
throw NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
}
}
}
else {
#if STREAMING_LOG
std::cerr << "[generator_streambuf::underflow()] noop in underflow()" << std::endl;
#endif
_ret = traits_type::to_int_type(*curr_ptr);
}
}
#if STREAMING_LOG
std::cerr << "[generator_streambuf::underflow()] _ret = " << _ret << std::endl;
#endif
return _ret;
}
virtual std::streambuf *setbuf(char *buf_begin, std::streamsize buf_size) {
// discard the current non-overflowing buffer, which may be invalid
_M_pbuf = buf_begin;
_M_pbuf_size = buf_size;
if (is_overflowing()) {
// fill with the overflowing data
std::streamsize overflowing_size = pptr() - pbase();
if (overflowing_size <= _M_pbuf_size) {
std::copy_n(&_M_pbuf_overflow[0], overflowing_size, _M_pbuf);
if (overflowing_size < _M_pbuf_size) {
// non-overflowing mode
setp(_M_pbuf, _M_pbuf + _M_pbuf_size);
pbump(overflowing_size);
}
else {
// overflowing mode
setp(&_M_pbuf_overflow[0], &_M_pbuf_overflow[0] + _M_pbuf_overflow.size());
}
}
else {
// still in overflowing mode
std::streamsize new_overflowing_size = overflowing_size - _M_pbuf_size;
// fill non-overflowing buffer with the overflow buffer data
std::copy_n(&_M_pbuf_overflow[0], _M_pbuf_size, _M_pbuf);
// move overflowing buffer data
std::copy_n(&_M_pbuf_overflow[0] + overflowing_size, new_overflowing_size, &_M_pbuf_overflow[0]);
// update the pointers
setp(&_M_pbuf_overflow[0], &_M_pbuf_overflow[0] + _M_pbuf_overflow.size());
pbump(new_overflowing_size);
}
}
else {
// update the pointers
// non-overflowing mode
setp(_M_pbuf, _M_pbuf + _M_pbuf_size);
}
return this;
}
virtual int_type overflow(int_type c = traits_type::eof()) {
const bool c_is_eof = traits_type::eq_int_type(c, traits_type::eof());
if (pptr() < epptr()) {
// room in buffer
}
else {
// no room in the current buffer
if (is_overflowing()) {
// expand the overflow buffer
_M_pbuf_overflow.resize(_M_pbuf_overflow.size() * 2); // resize by 2 times of the current size
}
else {
// switch to overflowing mode
}
setp(&_M_pbuf_overflow[0], &_M_pbuf_overflow[0] + _M_pbuf_overflow.size());
}
if (!c_is_eof) {
pbump(1);
*pptr() = traits_type::to_char_type(c);
}
return traits_type::not_eof(c);
}
virtual std::streamsize xsputn(const char *str, std::streamsize bytes) {
#if STREAMING_LOG
std::cerr << "[generator_streambuf::xsputn] bytes = " << bytes << std::endl;
#if STREAMING_DUMP
std::cerr << "[generator_streambuf::xsputn] output = \n\"";
std::cerr.write(str, bytes);
std::cerr << "\" end of output" << std::endl;
#endif
#endif
if (is_overflowing()) {
#if STREAMING_LOG
std::cerr << "[generator_streambuf::xsputn] is_overflowing = true" << std::endl;
#endif
if (pptr() + bytes <= epptr()) {
// within the overflow buffer
#if STREAMING_LOG
std::cerr << "[generator_streambuf::xsputn] within the overflow buffer" << std::endl;
#endif
std::copy_n(str, bytes, pptr());
pbump(bytes);
}
else {
// overflows the current overflow buffer
#if STREAMING_LOG
std::cerr << "[generator_streambuf::xsputn] overflows the current overflow buffer; epptr() - pptr() = " << (epptr() - pptr())
<< " _M_pbuf_overflow.size() = " << _M_pbuf_overflow.size()
<< " pptr() - &_M_pbuf_overflow[0] = " << (pptr() - &_M_pbuf_overflow[0]) << std::endl;
#endif
std::streamsize current_overflow_size = pptr() - pbase();
std::streamsize required_overflow_size = current_overflow_size + bytes;
std::streamsize targeted_overflow_size = _M_pbuf_overflow.size();
while (targeted_overflow_size < required_overflow_size) {
targeted_overflow_size *= 2;
}
// expand the overflow buffer
#if STREAMING_LOG
std::cerr << "[generator_streambuf::xsputn] expands the overflow buffer; targeted_overflow_size = " << targeted_overflow_size << std::endl;
#endif
_M_pbuf_overflow.resize(targeted_overflow_size);
// update pointers
setp(&_M_pbuf_overflow[0], &_M_pbuf_overflow[0] + _M_pbuf_overflow.size());
pbump(current_overflow_size);
// write to the expanded buffer
std::copy_n(str, bytes, pptr());
}
}
else {
// not in the overflowing mode
#if STREAMING_LOG
std::cerr << "[generator_streambuf::xsputn] is_overflowing = false" << std::endl;
#endif
if (pptr() + bytes <= epptr()) {
// within the _M_pbuf buffer
#if STREAMING_LOG
std::cerr << "[generator_streambuf::xsputn] pptr() + bytes <= epptr(); within the _M_pbuf buffer" << std::endl;
#endif
std::copy_n(str, bytes, pptr());
pbump(bytes);
if (pptr() >= epptr()) {
// transition to overflowing mode
#if STREAMING_LOG
std::cerr << "[generator_streambuf::xsputn] pptr() >= epptr() after writing; transition to overflowing mode" << std::endl;
#endif
setp(&_M_pbuf_overflow[0], &_M_pbuf_overflow[0] + _M_pbuf_overflow.size());
}
}
else {
// split into 2 buffers
std::streamsize remaining_buffer_size = epptr() - pptr();
std::streamsize overflowing_size = bytes - remaining_buffer_size;
#if STREAMING_LOG
std::cerr << "[generator_streambuf::xsputn] split into 2 buffers; "
<< "remaining_buffer_size = " << remaining_buffer_size
<< " overflowing_size = " << overflowing_size << std::endl;
#endif
std::copy_n(str, remaining_buffer_size, pptr());
pbump(remaining_buffer_size);
// put the remaining data into the overflow buffer
if (overflowing_size <= _M_pbuf_overflow.size()) {
// within the current overflow buffer
}
else {
// expand the current overflow buffer
std::streamsize required_overflow_size = overflowing_size;
std::streamsize targeted_overflow_size = _M_pbuf_overflow.size();
while (targeted_overflow_size < required_overflow_size) {
targeted_overflow_size *= 2;
}
#if STREAMING_LOG
std::cerr << "[generator_streambuf::xsputn] expands the overflow buffer; targeted_overflow_size = " << targeted_overflow_size << std::endl;
#endif
_M_pbuf_overflow.resize(targeted_overflow_size);
}
setp(&_M_pbuf_overflow[0], &_M_pbuf_overflow[0] + _M_pbuf_overflow.size());
std::copy_n(str + remaining_buffer_size, overflowing_size, pptr());
pbump(overflowing_size);
}
}
return bytes;
}
virtual int sync() {
if (_M_mode & std::ios::out) {
#if STREAMING_LOG
std::cerr << "[generator_streambuf::sync] out_avail = " << out_avail() << std::endl;
#endif
// invalidate the contents of _M_pbuf
if (!is_overflowing()) {
// not overflowing
// transition to the overflowing mode
setp(&_M_pbuf_overflow[0], &_M_pbuf_overflow[0] + _M_pbuf_overflow.size());
}
// invalidate the _M_pbuf (not owning)
_M_pbuf = nullptr;
_M_pbuf_size = 0;
return 0;
}
else {
#if STREAMING_LOG
std::cerr << "[generator_streambuf::sync] in; noop" << std::endl;
#endif
return 0;
}
}
};
class generator_stream: public std::iostream, public std::enable_shared_from_this<generator_stream> {
public:
generator_stream(generator_cb generator, std::streamsize buf_size = 16384) // nghttp2 responds in chunks of 16384 bytes at most
: _M_generator_buf(generator, buf_size), std::iostream(&_M_generator_buf) {
init(&_M_generator_buf);
}
virtual ~generator_stream() {
#if STREAMING_LOG
std::cerr << "[generator_stream::~generator_stream()] destructing" << std::endl;
#endif
}
ssize_t check_status(uint32_t *data_flags) {
auto bytes = gcount();
#if STREAMING_LOG
std::cerr << "[generator_stream:check_status()] gcount() = " << bytes << std::endl;
#endif
if (bytes == 0) {
if (eof()) {
#if STREAMING_LOG
std::cerr << "[generator_stream:check_status()] eof() = true" << std::endl;
#endif
if (fail()) {
#if STREAMING_LOG
std::cerr << "[generator_stream:check_status()] fail() = true; NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE" << std::endl;
#endif
bytes = (ssize_t)NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
}
else {
#if STREAMING_LOG
std::cerr << "[generator_stream:check_status()] fail() = false; NGHTTP2_DATA_FLAG_EOF" << std::endl;
#endif
*data_flags |= NGHTTP2_DATA_FLAG_EOF;
}
}
else if (good()) {
#if STREAMING_LOG
std::cerr << "[generator_stream:check_status()] good() = true; NGHTTP2_ERR_DEFERRED" << std::endl;
#endif
// bytes == 0 and good(), i.e., DEFERRED
bytes = (ssize_t)NGHTTP2_ERR_DEFERRED;
}
else {
#if STREAMING_LOG
std::cerr << "[generator_stream:check_status()] good() = false; NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE" << std::endl;
#endif
bytes = (ssize_t)NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
}
}
else {
// bytes > 0
#if STREAMING_LOG
std::cerr << "[generator_stream:check_status()] bytes = " << bytes << " > 0 ; " << std::endl;
#endif
if (eof()) {
#if STREAMING_LOG
std::cerr << "[generator_stream:check_status()] bytes > 0 & eof() = true; NGHTTP2_DATA_FLAG_EOF" << std::endl;
#endif
*data_flags |= NGHTTP2_DATA_FLAG_EOF;
}
}
return bytes;
}
bool is_overflowing() {
return _M_generator_buf.is_overflowing();
}
generator_streambuf *setbuf(uint8_t *buf, std::size_t len) {
return (generator_streambuf *)(_M_generator_buf.pubsetbuf((char *)buf, (std::streamsize)len));
}
std::streamsize peek_and_filter(bool &incomplete, bool &deferred) {
try {
incomplete = false;
deferred = false;
auto in_avail = _M_generator_buf.in_avail();
if (in_avail < 0) {
// EOF
do_close();
return -1;
}
char *consumable_begin;
char *consumable_end;
std::string &buf = _M_generator_buf.peek(consumable_begin, consumable_end);
bool found = do_peek(buf, consumable_begin, consumable_end);
std::streamsize consumable_size;
if (found) {
consumable_size = consumable_end - consumable_begin;
#if STREAMING_LOG
std::cerr << "[generator_stream::peek_and_filter()] comsumable data found; consumable size = " << consumable_size << std::endl;
#endif
auto filtered = do_filter(consumable_begin, consumable_size);
}
else {
generator_streambuf::generator_state state = _M_generator_buf.state();
if (state == generator_streambuf::STATE_EOF) {
// EOF
incomplete = true;
consumable_size = consumable_end - consumable_begin;
#if STREAMING_LOG
std::cerr << "[generator_stream::peek_and_filter()] comsumable data not found; EOF; cosumable size = " << consumable_size << std::endl;
#endif
auto filtered = do_filter(consumable_begin, consumable_size);
do_close();
}
else if (state == generator_streambuf::STATE_DEFER) {
// DEFER
#if STREAMING_LOG
std::cerr << "[generator_stream::peek_and_filter()] comsumable data not found; DEFER" << std::endl;
#endif
incomplete = true;
deferred = true;
consumable_size = 0;
}
else {
consumable_size = consumable_end - consumable_begin;
#if STREAMING_LOG
std::cerr << "[generator_stream::peek_and_filter()] comsumable data not found; buf.size() = " << buf.size() << " consumable_size = " << consumable_size << std::endl;
#if STREAMING_DUMP
std::cerr << "[generator_stream::peek_and_filter()] consumable data = \"";
std::cerr.write(consumable_begin, consumable_size);
std::cerr << "\"" << std::endl;
#endif
#endif
if (buf.size() > consumable_size) {
// not EOF, buffer not full
if (state == generator_streambuf::STATE_DEFER) {
#if STREAMING_LOG
std::cerr << "[generator_stream::peek_and_filter()] state = DEFER; comsumable data not found; consume 0 bytes" << std::endl;
#endif
incomplete = true;
deferred = true;
consumable_size = 0;
}
else {
#if STREAMING_LOG
std::cerr << "[generator_stream::peek_and_filter()] comsumable data not found; consume 0 bytes" << std::endl;
#endif
incomplete = true;
consumable_size = 0;
}
}
else {
// not EOF, buffer full
incomplete = true;
// give up recognizing a newline
consumable_size = consumable_end - consumable_begin;
auto filtered = do_filter(consumable_begin, consumable_size);
}
}
}
#if STREAMING_LOG
std::cerr << "[generator_stream::peek_and_filter()] returning consumable_size = " << consumable_size << std::endl;
#if STREAMING_DUMP
std::cerr << "[generator_stream::peek_and_filter()] consuming \"";
std::cerr.write(consumable_begin, consumable_size);
std::cerr << "\"" << std::endl;
#endif
#endif
return consumable_size;
}
catch (...) {
#if STREAMING_LOG
std::cerr << "[generator_stream::peek_and_filter()] exception thrown" << std::endl;
#endif
setstate(std::ios_base::failbit);
return 0;
}
}
// override do_peek to detect transformable chunks in buffer
virtual bool do_peek(std::string &buf, char *&begin, char *&end) {
// begin must not be changed
return true;
}
// override do_filter to implement transformation
virtual std::streamsize do_filter(char *in_beg, std::streamsize in_size) {
write(in_beg, in_size); // identity filter
return in_size;
}
// override do_close to flush and close intermediate buffers
virtual void do_close() {}
std::streamsize filter(uint8_t *buf, std::size_t len, uint32_t *data_flags) {
#if STREAMING_LOG
std::cerr << "[generator_stream:filter()] len = " << len << std::endl;
#endif
if (fail()) {
return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
}
setbuf(buf, len);
if (is_overflowing()) {
flush();
return len;
}
auto out_avail = _M_generator_buf.out_avail();
if (eof()) {
*data_flags |= NGHTTP2_DATA_FLAG_EOF;
flush();
if (out_avail > 0) {
return out_avail;
}
else {
return 0;
}
}
// not overflowing, not eof
std::streamsize consumed = 0;
bool is_incomplete = false;
bool is_deferred = false;
generator_streambuf::generator_state state = _M_generator_buf.state();
if (state == generator_streambuf::STATE_DEFER) {
#if STREAMING_LOG
std::cerr << "[generator_stream::filter()] buf state = DEFER; resetting state by underflow and showmanyc" << std::endl;
#endif
try {
_M_generator_buf.underflow();
_M_generator_buf.showmanyc();
}
catch (...) {
setstate(std::ios_base::failbit);
return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
}
}
while (!is_overflowing()) {
consumed = peek_and_filter(is_incomplete, is_deferred);
if (consumed == -1) {
// EOF
#if STREAMING_LOG
std::cerr << "[generator_stream::filter()] peek_and_filter returned EOF = " << consumed << std::endl;
#endif
break;
}
else if (consumed == 0) {
// INCOMPLETE or DEFER or ERROR
#if STREAMING_LOG
std::cerr << "[generator_stream::filter()] peek_and_filter returned 0" << std::endl;
#endif
if (is_incomplete && !is_deferred) {
try {
_M_generator_buf.overflow();
_M_generator_buf.showmanyc();
}
catch (...) {
setstate(std::ios_base::failbit);
break;
}
continue;
}
break;
}
_M_generator_buf.pubseekoff(consumed, std::ios_base::cur, std::ios_base::in);
}
if (fail()) {
return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE;
}
#if STREAMING_LOG
if (consumed == 0) {
std::cerr << "[generator_stream::filter()] consumed = 0" << std::endl;
}
#endif
out_avail = _M_generator_buf.out_avail();
if (out_avail > 0) {
flush();
return out_avail;
}
else {
// out_avail == 0
if (consumed < 0) {
*data_flags |= NGHTTP2_DATA_FLAG_EOF;
flush();
return 0;
}
else if (consumed == 0) {
flush();
#if STREAMING_LOG
std::cerr << "[generator_stream::filter()] returning NGHTTP2_ERR_DEFERRED" << std::endl;
#endif
return NGHTTP2_ERR_DEFERRED;
}
else {
// consumed > 0 but out_avail == 0
flush();
#if STREAMING_LOG
std::cerr << "[generator_stream::filter()] returning 0" << std::endl;
#endif
return 0;
}
}
}
protected:
generator_streambuf _M_generator_buf;
};
#endif // GENERATOR_STREAM
#endif // STREAMING_SUPPORT
#if STREAMING_SUPPORT
#if BOOST_FILTER
// Adaptor for a chain of one or more boost::iostreams::filtering_streambuf<output> filters
class boost_filtering_streambuf_adaptor: public generator_stream {
public:
template <class... Filter>
boost_filtering_streambuf_adaptor(generator_cb cb, Filter&&... filters)
: generator_stream(cb),
_M_filtering_streambuf(),
_M_sink(*this) {
push(filters...);
}
template <class... Filter>
boost_filtering_streambuf_adaptor(generator_cb cb, std::streamsize buf_size, Filter&&... filters)
: generator_stream(cb, buf_size),
_M_filtering_streambuf(),
_M_sink(*this) {
push(filters...);
}
class generator_stream_sink: public boost::iostreams::sink {
public:
generator_stream_sink(boost_filtering_streambuf_adaptor &adaptor): adaptor(adaptor) {}
~generator_stream_sink() {
#if STREAMING_LOG
std::cerr << "[generator_stream_sink] destructing" << std::endl;
#endif
}
std::streamsize write(const char* s, std::streamsize n)
{
#if STREAMING_LOG
std::cerr << "[generator_stream_sink] write n = " << n << std::endl;
#if STREAMING_DUMP
std::cerr << "[generator_stream_sink] output = \n\"";
std::cerr.write(s, n);
std::cerr << "\" end of output" << std::endl;
#endif
#endif
adaptor.write(s, n);
return n;
}
protected:
boost_filtering_streambuf_adaptor &adaptor;
};
generator_stream_sink &sink() {
#if STREAMING_LOG
std::cerr << "[generator_stream:sink()]" << std::endl;
#endif
return _M_sink;
}
boost::iostreams::filtering_streambuf<boost::iostreams::output> &get_filter() {
#if STREAMING_LOG
std::cerr << "[generator_stream:get_filter()]" << std::endl;
#endif
return _M_filtering_streambuf;
}
bool do_peek(std::string &buf, char *&begin, char *&end) {
return true;
}
std::streamsize do_filter(char *in_beg, std::streamsize in_size) {
#if STREAMING_LOG
std::cerr << "[generator_stream::do_filter()] forwarding to filtering_streambuf->sputn() in_size = " << in_size << std::endl;
#endif
return _M_filtering_streambuf.sputn(in_beg, in_size);
}
void do_close() {
boost::iostreams::close(_M_filtering_streambuf);
}
protected:
void push() {
get_filter().push(sink(), _M_generator_buf.buf_size());
}
template <class Head, class... Tail>
void push(Head &&head, Tail&&... tail) {
get_filter().push(head, _M_generator_buf.buf_size());
push(std::forward<Tail>(tail)...);
}
boost::iostreams::filtering_streambuf<boost::iostreams::output> _M_filtering_streambuf;
generator_stream_sink _M_sink;
};
#endif // BOOST_FILTER
#endif // STREAMING_SUPPORT
} // namespace nicexprs
#endif // NICEXPRS_H
@t2ym
Copy link
Author

t2ym commented Apr 30, 2022

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment