Last active
April 30, 2022 04:28
-
-
Save t2ym/bf9acf06d5ae36338f8330fafd11dadb to your computer and use it in GitHub Desktop.
nicexprs.h: a single header file ExpressJs-like middleware API on nghttp2 asio library
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright (c) 2021, Tetsuya Mori <t2y3141592@gmail.com> | |
* All rights reserved. | |
* | |
* Redistribution and use in source and binary forms, with or without | |
* modification, are permitted provided that the following conditions are met: | |
* | |
* 1. Redistributions of source code must retain the above copyright notice, | |
* this list of conditions and the following disclaimer. | |
* 2. Redistributions in binary form must reproduce the above copyright notice, | |
* this list of conditions and the following disclaimer in the documentation | |
* and/or other materials provided with the distribution. | |
* | |
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND | |
* ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED | |
* WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE | |
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR | |
* ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES | |
* (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; | |
* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND | |
* ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT | |
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS | |
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. | |
* | |
*/ | |
/* | |
nicexprs.h: a single header file ExpressJs-like middleware API on nghttp2 asio library | |
# Dependencies | |
- nghttp2 asio library and its dependencies (boost, openssl, etc.) | |
- --enable-asio-lib must be enabled in nghttp2 configuration | |
# Features | |
- HTTP2 server middleware framework on top of the nghttp2 asio library | |
- No changes to the base nghttp2 asio library | |
- Similar to ExpressJs API | |
- Compatible with C++14 and later | |
# TODOs | |
- TBD | |
*/ | |
/* | |
# Change Log | |
To be converted to CHANGELOG.md in a dedicated project | |
## [Unreleased] | |
### Added | |
### Changed | |
### Removed | |
### Fixed | |
## [0.0.10] - 2021-05-18 | |
### Added | |
- Support streaming filters if STREAMING_SUPPORT macro is defined as truthy | |
- response::on_response(response_cb on_header, stream_cb cb) | |
- register a streaming filter callback with its corresponding on-header callback | |
- If STREAMING_SUPPORT macro is undefined or 0, nicexprs.h is almost the same as the previous version 0.0.9 | |
- Define std::iostream adaptor class generator_stream if GENERATOR_STREAM macro is defined as truthy | |
- Define boost_filtering_streambuf_adaptor class if BOOST_FILTER macro is defined as truthy | |
### Changed | |
- Change type of response::response_filters as std::list<filter_item> from std::list<response_cb> | |
- filter_item contains response_cb for buffered filtering, | |
or stream_cb for streamed filtering and another response_cb for header filtering in streaming | |
### Removed | |
- Remove unimplemented web_app::static_file() | |
## [0.0.9] - 2021-05-17 | |
### Fixed | |
- Fix the segmentation fault issue on popping an item from empty std::list<quadple> | |
## [0.0.8] - 2021-05-10 | |
### Fixed | |
- Fix the regression issue of empty deferred response | |
## [0.0.7] - 2021-05-04 | |
### Added | |
- Bypass response buffering if response filters are empty | |
## [0.0.6] - 2021-05-02 | |
### Added | |
- Support multiple on_close callbacks std::list<close_cb> response::on_close_callbacks invoked in the reversed order of their registrations | |
### Removed | |
- response::is_on_close_set | |
- response::on_close_ | |
## [0.0.5] - 2021-04-28 | |
### Added | |
- Add web_app.get(std::string path) and web_app.post(std::string path) | |
## [0.0.4] - 2021-04-27 | |
### Added | |
- Add extensible req.helper to store and manipulate data per request | |
## [0.0.3] - 2021-04-25 | |
### Changed | |
- Work around SIGSEGV when DUMP_MIDDLEWARE_ITEM macro is not defined | |
## [0.0.2] - 2021-04-25 | |
### Added | |
- DUMP_MIDDLEWARE_ITEM macro to switch on/off dumping middleware_items | |
## [0.0.1] - 2021-04-25 | |
### Added | |
- Initial PoC version as a single header file | |
- Subject to drastic changes | |
*/ | |
#ifndef NICEXPRS_H | |
#define NICEXPRS_H | |
#include <iostream> | |
#include <string> | |
#include <memory> | |
#include <boost/stacktrace.hpp> | |
#include <boost/algorithm/string.hpp> | |
#include <nlohmann/json.hpp> | |
#include <openssl/sha.h> | |
#include <boost/archive/iterators/base64_from_binary.hpp> | |
#include <boost/archive/iterators/transform_width.hpp> | |
#if STREAMING_SUPPORT | |
#if BOOST_FILTER | |
#include <boost/iostreams/filtering_streambuf.hpp> | |
#endif | |
#endif | |
#include <nghttp2/asio_http2_server.h> | |
namespace nicexprs { | |
using json = nlohmann::json; | |
namespace h2a = nghttp2::asio_http2; | |
namespace h2s = nghttp2::asio_http2::server; | |
class web_app; | |
struct middleware_item; | |
#if STREAMING_SUPPORT | |
struct filter_item; | |
struct deffered_status; | |
#endif | |
class middleware_controller; | |
class helper_base; | |
class request; | |
class response; | |
typedef h2s::http2 http2; | |
typedef h2s::request_cb request_cb; | |
typedef h2a::header_map header_map; | |
typedef h2a::header_value header_value; | |
typedef h2a::uri_ref uri_ref; | |
typedef h2a::generator_cb generator_cb; | |
typedef h2a::close_cb close_cb; | |
typedef h2a::data_cb data_cb; | |
typedef std::size_t middleware_index; | |
typedef std::shared_ptr<web_app> web_app_ptr; | |
typedef std::function<void (request &req, response &res)> middleware_cb; | |
typedef std::function<void (response &res)> response_cb; | |
#if STREAMING_SUPPORT | |
typedef std::function<generator_cb (response &res, generator_cb src)> stream_cb; | |
typedef std::list<filter_item> filter_chain; | |
#endif | |
typedef std::function<void (response &res, std::string &method, std::string &raw_path_query, header_map &header)> push_cb; // synchronous callback | |
typedef std::list<middleware_item> middleware_chain; | |
typedef std::vector<middleware_item> middleware_vector; | |
typedef enum class next_type { | |
NEXT_APP, | |
FIRST_CHILD, | |
MATCHING_CHILD | |
} next_type; | |
typedef enum class route_type { | |
REQUEST, // /path/file.ext; from HTTP client | |
ALL, // ; route * for middleware_item | |
PREFIX, // /path/subpath; route /path/subpath for middleware_item without parameters | |
PARAMETERIZED_PREFIX, // /path; route /path/:param1/:param2 for middleware_item with parameters | |
REGEX // /path/v([^/]*); route /path/v([^/]*) for middleware_item with regex patterns | |
} route_type; | |
typedef enum class app_type { | |
APP, | |
DISPATCHER, | |
RAW_MIDDLEWARE | |
} app_type; | |
struct request_key { | |
std::string method_; | |
uri_ref uri_; | |
std::size_t prefix_length; | |
route_type type; | |
}; | |
struct middleware_item { | |
middleware_index index; | |
std::string prefix; | |
std::string method; | |
std::string path; | |
middleware_cb cb; | |
middleware_index next; | |
app_type type; | |
app_type parent_type; | |
middleware_index parent_index; | |
middleware_index original_next; | |
middleware_index parent_next; | |
std::list<middleware_index> children; | |
std::map<std::string, std::map<request_key, middleware_index>> dispatch_map; | |
route_type r_type; | |
std::size_t prefix_length; | |
}; | |
#if STREAMING_SUPPORT | |
struct filter_item { | |
filter_item(middleware_index index, response_cb buffered): index(index), buffered(buffered) {} | |
filter_item(middleware_index index, stream_cb streamed, response_cb on_header): index(index), streamed(streamed), on_header(on_header) {} | |
middleware_index index; | |
response_cb buffered; | |
stream_cb streamed; | |
response_cb on_header; | |
}; | |
struct deferred_status { | |
typedef enum { | |
ORIGIN = std::string::npos | |
} deferred_index; | |
deferred_status(middleware_index index, bool deferred = false, bool resumed = false) | |
: index(index), deferred(deferred), resumed(resumed) {} | |
middleware_index index; | |
bool deferred; | |
bool resumed; | |
}; | |
#endif | |
class helper_base { | |
public: | |
virtual ~helper_base() {} | |
}; | |
inline bool operator<(const request_key &a, const request_key &b); | |
class request { | |
public: | |
request(const h2s::request &req) | |
: req(req), | |
key(request_key{req.method(), req.uri(), 0, route_type::REQUEST}), | |
is_matched(false), | |
header_(req.header()), | |
body() { | |
#if INSTANTIATION_LOG | |
std::cerr << "Constructed request " << key.uri_.raw_path << std::endl; | |
#endif | |
} | |
~request() { | |
#if INSTANTIATION_LOG | |
std::cerr << "Destructed request " << key.uri_.raw_path << std::endl; | |
#endif | |
} | |
header_map &header() { | |
return header_; // non-const header_map & | |
} | |
const std::string &method() const { | |
return key.method_; | |
} | |
uri_ref &uri() { | |
return key.uri_; // non-const uri_ref & | |
} | |
void on_data(data_cb cb) { | |
req.on_data(std::move(cb)); | |
} | |
void next(uint32_t err = 0, next_type type = next_type::NEXT_APP); | |
bool match(); // side effects: update req.key.prefix_length and req.is_matched on matching | |
request_key key; | |
bool is_matched; | |
header_map header_; | |
std::string body; // raw request body | |
std::shared_ptr<helper_base> helper; | |
const h2s::request &req; | |
std::shared_ptr<middleware_controller> controller; | |
}; | |
class response { | |
public: | |
response(const h2s::response &res) | |
: res(res), | |
is_header_written(false), | |
status_code_(0), | |
is_trailer_expected(false), | |
#if STREAMING_SUPPORT | |
force_buffered_filtering(false), | |
#endif | |
is_push(false) { | |
#if INSTANTIATION_LOG | |
std::cerr << "Constructed response" << std::endl; | |
#endif | |
setup_on_close(); | |
} | |
response(response &initiator, const h2s::response &res, std::string method, std::string raw_path_query, header_map header) | |
: res(res), | |
is_header_written(false), | |
status_code_(0), | |
is_reading_body(false), | |
response_filters(initiator.response_filters), | |
is_trailer_expected(false), | |
#if STREAMING_SUPPORT | |
force_buffered_filtering(initiator.force_buffered_filtering), | |
#endif | |
is_push(true), | |
push_method(method), | |
push_raw_path_query(raw_path_query), | |
push_header(header) { | |
#if INSTANTIATION_LOG | |
std::cerr << "Constructed response for push " << push_raw_path_query << std::endl; | |
#endif | |
#if STREAMING_LOG | |
std::cerr << "[response::response() for push]: response_filters.size() = " << response_filters.size() << std::endl; | |
#endif | |
setup_on_close(); | |
} | |
~response() { | |
#if INSTANTIATION_LOG | |
if (is_push) { | |
std::cerr << "Destructed response for push " << push_raw_path_query << std::endl; | |
} | |
else { | |
std::cerr << "Destructed response" << std::endl; | |
} | |
#endif | |
} | |
// from nghttp2/src/http2.cc | |
bool expect_response_body(int status_code) { | |
return status_code == 101 || | |
(status_code / 100 != 1 && status_code != 304 && status_code != 204); | |
} | |
// from nghttp2/src/http2.cc | |
bool expect_response_body(const std::string &method, int status_code) { | |
return method != "HEAD" && expect_response_body(status_code); | |
} | |
void write_head(unsigned int status_code, header_map h = header_map{}); | |
unsigned int status_code() { | |
return status_code_; | |
} | |
header_map &header() { | |
return header_; | |
} | |
header_map &trailer() { | |
return trailer_; | |
} | |
std::shared_ptr<response> push(boost::system::error_code &ec, | |
std::string method, | |
std::string raw_path_query, | |
header_map header = header_map{}) { | |
auto push_it = push_filters.cbegin(); | |
auto push_end = push_filters.cend(); | |
while (push_it != push_end) { | |
(*push_it)(*this, method, raw_path_query, header); | |
push_it++; | |
} | |
const h2s::response *raw_push_res = res.push(ec, method, raw_path_query, header); | |
if (ec) { | |
return nullptr; | |
} | |
auto push_res = std::make_shared<response>(*this, *raw_push_res, method, raw_path_query, header); // TODO: is is inefficient to copy every element of response_filters | |
push_res->controller = controller; | |
push_responses.emplace(raw_path_query, push_res); | |
return push_res; | |
} | |
void end(std::string data = ""); | |
void next(generator_cb::result_type err = 0) { | |
if (err < 0) { | |
res.end([err](uint8_t *buf, std::size_t len, uint32_t *data_flags)->ssize_t { | |
return err; | |
}); | |
} | |
else { | |
it++; | |
if (it != response_filters.cend()) { | |
#if STREAMING_SUPPORT | |
if (!it->buffered) { | |
(wrap_stream_cb(it->streamed, it->on_header))(*this); | |
} | |
else { | |
(it->buffered)(*this); | |
} | |
#else // !STREAMING_SUPPORT | |
(*it)(*this); | |
#endif // !STREAMING_SUPPORT | |
} | |
else { | |
respond_body(*this); | |
} | |
} | |
} | |
response_cb buffer_body(generator_cb cb, std::size_t buflen = 16384) { // streaming is not supported | |
auto ss = std::make_shared<std::ostringstream>(); | |
auto isEOF = std::make_shared<bool>(false); | |
return [cb, buflen, ss, isEOF](response &res) { | |
#if STREAMING_LOG | |
std::cerr << "[response::buffer_body callback] called; isEOF = " << *isEOF << " ss.str().size() = " << ss->str().size() << std::endl; | |
#endif | |
uint32_t data_flags; | |
ssize_t upstream_bytes = 0; | |
if (!*isEOF) { | |
std::vector<uint8_t> buf(buflen); | |
upstream_bytes = 0; | |
while (upstream_bytes >= 0) { | |
data_flags = NGHTTP2_DATA_FLAG_NONE; | |
upstream_bytes = cb(buf.data(), buf.size(), &data_flags); | |
if (upstream_bytes > 0) { | |
ss->write(reinterpret_cast<const char *>(buf.data()), upstream_bytes); | |
} | |
if (upstream_bytes < 0) { | |
break; | |
} | |
else if (data_flags & NGHTTP2_DATA_FLAG_EOF) { | |
*isEOF = true; | |
if (data_flags & NGHTTP2_DATA_FLAG_NO_END_STREAM) { | |
res.is_trailer_expected = true; | |
} | |
break; | |
} | |
} | |
} | |
if (*isEOF) { | |
res.body = ss->str(); | |
res.is_reading_body = false; | |
if (res.it != res.response_filters.cend()) { | |
#if STREAMING_SUPPORT | |
if (!res.it->buffered) { | |
(res.wrap_stream_cb(res.it->streamed, res.it->on_header))(res); // next() on wrapped stream_cb | |
} | |
else { | |
(res.it->buffered)(res); // next() | |
} | |
#else // !STREAMING_SUPPORT | |
(*res.it)(res); // next() | |
#endif // !STREAMING_SUPPORT | |
} | |
else { | |
res.respond_body(res); | |
} | |
return; | |
} | |
else if (upstream_bytes == NGHTTP2_ERR_DEFERRED) { | |
return; // wait for asynchronous resume | |
} | |
else if (upstream_bytes == NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE) { | |
res.is_reading_body = false; | |
res.next(NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE); | |
return; | |
} | |
}; | |
} | |
response_cb start_response(); | |
void end(generator_cb cb) { | |
#if STREAMING_LOG | |
std::cerr << "end(generator_cb): response_filters.size() = " << response_filters.size() << std::endl; | |
#endif | |
if (response_filters.size() == 0 && trailer_filters.size() == 0) { | |
if (!is_header_written) { | |
is_header_written = true; | |
res.write_head(status_code(), header()); | |
} | |
res.end(cb); // bypass empty filters | |
return; | |
} | |
#if STREAMING_SUPPORT | |
else if (!force_buffered_filtering && response_filters.size() > 0) { | |
// streaming | |
#if STREAMING_LOG | |
std::cerr << "end(generator_cb): streaming" << std::endl; | |
#endif | |
auto latently_deferred_generator_adaptor = [this](middleware_index index, generator_cb cb)->generator_cb { | |
auto latently_deferred = std::make_shared<deferred_status>(index, false); | |
this->latently_deferred_status.push_back(latently_deferred); // register the latently_deferred object for the cb | |
generator_cb wrapped_cb = [this, latently_deferred, cb](uint8_t *buf, std::size_t len, uint32_t *data_flags)->ssize_t { | |
if (latently_deferred->deferred) { | |
#if STREAMING_LOG | |
std::cerr << "[latently_deferred_generator_adaptor] index = " | |
<< (latently_deferred->index == deferred_status::ORIGIN ? "ORIGIN" : std::to_string(latently_deferred->index)) | |
<< " deferred = true; returning DEFER" << std::endl; | |
#endif | |
return NGHTTP2_ERR_DEFERRED; // block a call from the sink by not calling its source as it is still latently deferred | |
} | |
else { | |
// not latently deferred | |
#if STREAMING_LOG | |
std::cerr << "[latently_deferred_generator_adaptor] index = " | |
<< (latently_deferred->index == deferred_status::ORIGIN ? "ORIGIN" : std::to_string(latently_deferred->index)) | |
<< " deferred = false" << std::endl; | |
#endif | |
auto _ret = cb(buf, len, data_flags); // call the source | |
if (_ret == NGHTTP2_ERR_DEFERRED) { | |
// the source is deferred | |
#if STREAMING_LOG | |
std::cerr << "[latently_deferred_generator_adaptor] index = " | |
<< (latently_deferred->index == deferred_status::ORIGIN ? "ORIGIN" : std::to_string(latently_deferred->index)) | |
<< " the source returned DEFER" << std::endl; | |
#endif | |
if (latently_deferred->resumed) { | |
// the source has already been resumed; skip deferring | |
#if STREAMING_LOG | |
std::cerr << "[latently_deferred_generator_adaptor] index = " | |
<< (latently_deferred->index == deferred_status::ORIGIN ? "ORIGIN" : std::to_string(latently_deferred->index)) | |
<< " resumed = true; returing 0" << std::endl; | |
#endif | |
latently_deferred->resumed = false; // update the status | |
_ret = 0; // return 0 to avoid suspending HTTP/2 layer | |
} | |
else { | |
#if STREAMING_LOG | |
std::cerr << "[latently_deferred_generator_adaptor] index = " | |
<< (latently_deferred->index == deferred_status::ORIGIN ? "ORIGIN" : std::to_string(latently_deferred->index)) | |
<< " resumed = false; set deferred = true; returing DEFER" << std::endl; | |
#endif | |
latently_deferred->deferred = true; // update the status | |
} | |
} | |
return _ret; | |
} | |
}; | |
return wrapped_cb; | |
}; | |
generator_cb stream_generator_cb = latently_deferred_generator_adaptor(deferred_status::ORIGIN, cb); | |
for (auto item: response_filters) { | |
#if STREAMING_LOG | |
std::cerr << "end(generator_cb): stream_generator_cb = latently_deferred_generator_adaptor(item.index = " | |
<< item.index | |
<< ", (item.streamed)(*this, stream_generator_cb))" << std::endl; | |
#endif | |
stream_generator_cb = latently_deferred_generator_adaptor(item.index, (item.streamed)(*this, stream_generator_cb)); | |
} | |
if (!is_header_written) { | |
auto it = response_filters.cbegin(); | |
auto end = response_filters.cend(); | |
while (it != end) { | |
if (it->on_header) { | |
(it->on_header)(*this); // synchronous call | |
} | |
it++; | |
} | |
is_header_written = true; | |
#if STREAMING_LOG | |
std::cerr << "end(generator_cb): res.write_head()" << std::endl; | |
#endif | |
res.write_head(status_code(), header()); | |
} | |
#if STREAMING_LOG | |
std::cerr << "end(generator_cb): res.end(stream_generator_cb)" << std::endl; | |
#endif | |
res.end(stream_generator_cb); // start streaming | |
return; | |
} | |
else { | |
// buffered | |
#if STREAMING_LOG | |
std::cerr << "end(generator_cb): buffered" << std::endl; | |
#endif | |
read_body = buffer_body(cb); | |
respond_body = start_response(); | |
it = response_filters.begin(); | |
is_reading_body = true; | |
read_body(*this); | |
} | |
#else // !STREAMING_SUPPORT | |
read_body = buffer_body(cb); | |
respond_body = start_response(); | |
it = response_filters.cbegin(); | |
is_reading_body = true; | |
read_body(*this); | |
#endif // !STREAMING_SUPPORT | |
} | |
void write_trailer(header_map h = header_map{}) { | |
trailer_ = std::move(h); | |
#if STREAMING_SUPPORT | |
if (!force_buffered_filtering && response_filters.size() > 0) { | |
// streaming | |
is_trailer_expected = true; | |
do_write_trailer(); | |
} | |
#endif | |
} | |
void do_write_trailer() { | |
if (is_trailer_expected) { | |
auto it_ = trailer_filters.cbegin(); | |
auto cend = trailer_filters.cend(); | |
while (it_ != cend) { | |
(*it_)(*this); // synchronous | |
it_++; | |
} | |
res.write_trailer(std::move(trailer_)); | |
} | |
} | |
#if STREAMING_SUPPORT | |
// wrap stream_cb as response_cb for compatibility; no longer streaming | |
response_cb wrap_stream_cb(stream_cb cb, response_cb header_cb) { | |
auto is_initial = std::make_shared<bool>(true); | |
auto is_eof = std::make_shared<bool>(false); | |
auto left_bytes = std::make_shared<std::size_t>(0); | |
auto src_generator = std::make_shared<generator_cb>(); | |
auto stream_generator = std::make_shared<generator_cb>(); | |
auto buffer = std::make_shared<std::string>(); | |
auto sink = std::make_shared<std::ostringstream>(); | |
const std::size_t buffer_size = 16384; // bytes | |
return [cb, header_cb, is_initial, is_eof, left_bytes, src_generator, stream_generator, buffer, buffer_size, sink](response &res) { | |
if (*is_initial) { | |
#if STREAMING_LOG | |
std::cerr << "[wrapped stream_cb callback]: initial = true" << std::endl; | |
#endif | |
if (header_cb) { | |
header_cb(res); | |
} | |
*is_initial = false; | |
*left_bytes = res.body.size(); | |
#if STREAMING_LOG | |
std::cerr << "[wrapped stream_cb callback]: res.body.size() = " << *left_bytes << std::endl; | |
#endif | |
buffer->resize(buffer_size); | |
*src_generator = [&res, is_eof, left_bytes](uint8_t *dst, std::size_t len, uint32_t *data_flags)->ssize_t { | |
if (*is_eof) { | |
#if STREAMING_LOG | |
std::cerr << "[wrapped stream_cb src_generator callback]: is_eof = " << *is_eof << std::endl; | |
#endif | |
*data_flags |= NGHTTP2_DATA_FLAG_EOF; | |
return (ssize_t)0; | |
} | |
auto bytes = std::min(len, *left_bytes); | |
#if STREAMING_LOG | |
std::cerr << "[wrapped stream_cb src_generator callback]: called len = " << len << " left_bytes = " << *left_bytes << " bytes = " << bytes << std::endl; | |
#endif | |
std::copy_n(res.body.c_str() + (res.body.size() - *left_bytes), bytes, dst); | |
*left_bytes -= bytes; | |
if (*left_bytes == 0) { | |
#if STREAMING_LOG | |
std::cerr << "[wrapped stream_cb src_generator callback]: left_bytes = " << *left_bytes << std::endl; | |
#endif | |
*data_flags |= NGHTTP2_DATA_FLAG_EOF; | |
*is_eof = true; | |
} | |
#if STREAMING_LOG | |
std::cerr << "[wrapped stream_cb src_generator callback]: return bytes = " << bytes << std::endl; | |
#endif | |
return (ssize_t)bytes; | |
}; | |
*stream_generator = cb(res, *src_generator); | |
} | |
uint32_t data_flags = NGHTTP2_DATA_FLAG_NONE; | |
ssize_t bytes; | |
#if STREAMING_LOG | |
std::cerr << "[wrapped stream_cb callback]: calling stream_generator" << std::endl; | |
#endif | |
while ((bytes = (*stream_generator)((uint8_t*)buffer->data(), buffer_size, &data_flags)) > 0) { | |
#if STREAMING_LOG | |
std::cerr << "[wrapped stream_cb callback]: stream_generator bytes = " << bytes << std::endl; | |
#endif | |
sink->write(buffer->data(), bytes); | |
if (data_flags & NGHTTP2_DATA_FLAG_EOF) { | |
#if STREAMING_LOG | |
std::cerr << "[wrapped stream_cb callback]: stream_generator EOF" << std::endl; | |
#endif | |
break; | |
} | |
data_flags = NGHTTP2_DATA_FLAG_NONE; // reset the flag | |
} | |
#if STREAMING_LOG | |
std::cerr << "[wrapped stream_cb callback]: out of stream_generator bytes = " << bytes << std::endl; | |
#endif | |
if (bytes == 0 && (data_flags | NGHTTP2_DATA_FLAG_EOF)) { | |
// fully generated without error | |
#if STREAMING_LOG | |
std::cerr << "[wrapped stream_cb callback]: bytes == 0 & data_flags EOF" << std::endl; | |
#endif | |
res.body = sink->str(); // update response body | |
#if STREAMING_LOG | |
std::cerr << "[wrapped stream_cb callback]: res.body =" << res.body << std::endl; | |
#endif | |
} | |
else if (bytes == NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE) { | |
// error in filtering | |
#if STREAMING_LOG | |
std::cerr << "[wrapped stream_cb callback]: bytes == NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE" << std::endl; | |
#endif | |
res.cancel(); | |
} | |
else if (bytes == NGHTTP2_ERR_DEFERRED) { | |
#if STREAMING_LOG | |
std::cerr << "[wrapped stream_cb callback]: bytes == NGHTTP2_ERR_DEFERRED" << std::endl; | |
#endif | |
// deferred | |
// wait to be recalled until stream_generator calls res.resume() | |
} | |
res.next(); | |
}; | |
} | |
#endif // STREAMING_SUPPORT | |
#if STREAMING_SUPPORT | |
void on_response(response_cb cb); | |
#else // !STREAMING_SUPPORT | |
void on_response(response_cb cb) { | |
response_filters.push_front(std::move(cb)); | |
} | |
#endif // !STREAMING_SUPPORT | |
#if STREAMING_SUPPORT | |
void on_response(response_cb on_header, stream_cb cb); | |
#endif | |
void on_push(push_cb cb) { | |
push_filters.push_front(std::move(cb)); | |
} | |
void on_trailer(response_cb cb) { | |
trailer_filters.push_front(std::move(cb)); | |
} | |
void on_close(close_cb cb) { | |
on_close_callbacks.push_front(std::move(cb)); | |
} | |
void setup_on_close(); | |
void cancel(uint32_t error_code = NGHTTP2_INTERNAL_ERROR) { | |
res.cancel(error_code); | |
} | |
void resume() { | |
if (is_header_written) { | |
#if STREAMING_SUPPORT | |
#if STREAMING_LOG | |
std::cerr << "[response::resume()] is_header_written = true" << std::endl; | |
#endif | |
if (latently_deferred_status.size() > 0) { | |
#if STREAMING_LOG | |
std::cerr << "[response::resume()] latently_deferred_status.size() = " << latently_deferred_status.size() << std::endl; | |
#endif | |
if (latently_deferred_status.front()->deferred) { | |
// lowest generator is deferred; HTTP/2 layer is ready to resume | |
#if STREAMING_LOG | |
std::cerr << "[response::resume()] latently_deferred_status.front()->index = " | |
<< (latently_deferred_status.front()->index == deferred_status::ORIGIN ? "ORIGIN" : std::to_string(latently_deferred_status.front()->index)) | |
<< " deferred = true" << std::endl; | |
#endif | |
auto it = latently_deferred_status.begin(); | |
auto end = latently_deferred_status.end(); | |
while (it != end) { | |
#if STREAMING_LOG | |
std::cerr << "[response::resume()] latently_deferred_status it->index = " | |
<< ((*it)->index == deferred_status::ORIGIN ? "ORIGIN" : std::to_string((*it)->index)) | |
<< " deferred = " << (*it)->deferred << " resumed = " << (*it)->resumed << std::endl; | |
#endif | |
(*it)->deferred = false; // reset deferred status | |
(*it)->resumed = false; // reset resumed status | |
it++; | |
} | |
#if STREAMING_LOG | |
std::cerr << "[response::resume()] calling res.resume()" << std::endl; | |
#endif | |
res.resume(); | |
} | |
else { | |
// lowest generator is not deferred; deferred status is still latent | |
#if STREAMING_LOG | |
std::cerr << "[response::resume()] latently_deferred_status.front()->index = " | |
<< (latently_deferred_status.front()->index == deferred_status::ORIGIN ? "ORIGIN" : std::to_string(latently_deferred_status.front()->index)) | |
<< " deferred = false" << std::endl; | |
std::cerr << "[response::resume()] latently_deferred_status.front()->index = " | |
<< (latently_deferred_status.front()->index == deferred_status::ORIGIN ? "ORIGIN" : std::to_string(latently_deferred_status.front()->index)) | |
<< " setting resumed = true" << std::endl; | |
#endif | |
latently_deferred_status.front()->resumed = true; | |
} | |
} | |
else { | |
// latently_deferred_status is empty | |
res.resume(); | |
} | |
#else // !STREAMING_SUPPORT | |
res.resume(); | |
#endif // !STREAMING_SUPPORT | |
} | |
else { | |
#if STREAMING_SUPPORT | |
if (!is_reading_body && it != response_filters.end()) { | |
if (!it->buffered) { | |
it->buffered = wrap_stream_cb(it->streamed, it->on_header); | |
} | |
(it->buffered)(*this); // resume reading from source | |
} | |
#else // !STREAMING_SUPPORT | |
if (!is_reading_body && it != response_filters.cend()) { | |
(*it)(*this); // resume reading from source | |
} | |
#endif // !STREAMING_SUPPORT | |
else { | |
read_body(*this); | |
} | |
} | |
} | |
bool is_header_written; | |
unsigned int status_code_; | |
header_map header_; | |
std::string body; // streaming not supported | |
bool is_trailer_expected; | |
header_map trailer_; | |
bool is_push; | |
std::string push_method; | |
std::string push_raw_path_query; | |
header_map push_header; | |
std::list<close_cb> on_close_callbacks; | |
response_cb read_body; | |
bool is_reading_body; | |
response_cb respond_body; | |
#if STREAMING_SUPPORT | |
bool force_buffered_filtering; | |
std::list<filter_item> response_filters; | |
std::list<filter_item>::iterator it; | |
std::list<std::shared_ptr<deferred_status>> latently_deferred_status; | |
#else // !STREAMING_SUPPORT | |
std::list<response_cb> response_filters; | |
std::list<response_cb>::const_iterator it; | |
#endif // !STREAMING_SUPPORT | |
std::list<response_cb> trailer_filters; | |
std::list<push_cb> push_filters; | |
std::map<std::string, std::shared_ptr<response>> push_responses; | |
const h2s::response &res; | |
std::shared_ptr<middleware_controller> controller; | |
}; | |
class middleware_controller { | |
public: | |
static request_cb handler(std::shared_ptr<middleware_vector> chain) { | |
if (chain->size() < 1) { | |
return [](const h2s::request &req_original, const h2s::response &res_original){ | |
res_original.cancel(); | |
}; | |
} | |
return [chain](const h2s::request &req_original, const h2s::response &res_original){ | |
auto controller = std::make_shared<middleware_controller>(req_original, res_original, chain); | |
controller->set_controller(controller); | |
controller->it = controller->request_filters->cbegin(); | |
(*controller->it).cb(controller->req, controller->res); | |
}; | |
} | |
public: | |
middleware_controller(const h2s::request &req_, const h2s::response &res_, std::shared_ptr<middleware_vector> chain) | |
: req(req_), res(res_), request_filters(chain) { | |
#if INSTANTIATION_LOG | |
std::cerr << "Constructed middleware_controller " << req.key.uri_.path << std::endl; | |
#endif | |
} | |
~middleware_controller() { | |
#if INSTANTIATION_LOG | |
std::cerr << "Destructed middleware_controller " << req.key.uri_.path << std::endl; | |
#endif | |
} | |
void set_controller(std::shared_ptr<middleware_controller> controller) { | |
req.controller = controller; | |
res.controller = controller; | |
} | |
void next(uint32_t err = 0, next_type type = next_type::NEXT_APP) { | |
if (err > 0) { | |
res.cancel(err); | |
} | |
else { | |
auto index = it->index; | |
auto next = it->next; | |
switch (type) { | |
case next_type::NEXT_APP: | |
std::advance(it, next - index); | |
break; | |
case next_type::FIRST_CHILD: | |
it++; | |
break; | |
case next_type::MATCHING_CHILD: | |
std::map<std::string, std::map<request_key, middleware_index>> &dispatch_map = (*request_filters)[index].dispatch_map; | |
#if MATCHING_LOG | |
std::cerr << "next(MATCHING_CHILD) from " << it->index << ":" << it->path << " for " << req.key.uri_.path << " with prefix " << req.key.uri_.path.substr(0, req.key.prefix_length) << " " << req.key.prefix_length << std::endl; | |
#endif | |
auto method_it = dispatch_map.find(req.key.method_); | |
if (method_it != dispatch_map.end()) { | |
#if MATCHING_LOG | |
std::cerr << "next(MATCHING_CHILD) found method " << req.key.method_ << " from " << index << ":" << (*request_filters)[index].path | |
<< " req.key.prefix_length=" << req.key.prefix_length << " " << req.key.uri_.path.substr(0, req.key.prefix_length) << std::endl; | |
#endif | |
auto child_it = method_it->second.find(req.key); | |
if (child_it != method_it->second.end()) { | |
next = child_it->second; | |
#if MATCHING_LOG | |
std::cerr << "next(MATCHING_CHILD) found matching key " << next << ":" << (*request_filters)[next].path << std::endl; | |
#endif | |
req.is_matched = true; | |
} | |
else { | |
#if MATCHING_LOG | |
std::cerr << "next(MATCHING_CHILD) did not find matching key for " << req.key.uri_.path << std::endl; | |
#endif | |
method_it = dispatch_map.find("*"); | |
if (method_it != dispatch_map.end()) { | |
#if MATCHING_LOG | |
std::cerr << "next(MATCHING_CHILD) found method " << req.key.method_ << " from " << index << ":" << (*request_filters)[index].path << std::endl; | |
#endif | |
child_it = method_it->second.find(req.key); | |
if (child_it != method_it->second.end()) { | |
next = child_it->second; | |
#if MATCHING_LOG | |
std::cerr << "next(MATCHING_CHILD) found matching key " << next << ":" << (*request_filters)[next].path << std::endl; | |
#endif | |
req.is_matched = true; | |
} | |
} | |
} | |
} | |
else { | |
method_it = dispatch_map.find("*"); | |
#if MATCHING_LOG | |
std::cerr << "next(MATCHING_CHILD) found method " << req.key.method_ << " from " << index << ":" << (*request_filters)[index].path << std::endl; | |
#endif | |
if (method_it != dispatch_map.end()) { | |
auto child_it = method_it->second.find(req.key); | |
if (child_it != method_it->second.end()) { | |
next = child_it->second; | |
#if MATCHING_LOG | |
std::cerr << "next(MATCHING_CHILD) found matching key " << next << ":" << (*request_filters)[next].path << std::endl; | |
#endif | |
req.is_matched = true; | |
} | |
} | |
} | |
std::advance(it, next - index); | |
break; | |
} | |
if (it != request_filters->cend()) { | |
req.key.prefix_length = it->prefix.size(); | |
it->cb(req, res); | |
} | |
} | |
} | |
std::shared_ptr<middleware_vector> request_filters; | |
middleware_vector::const_iterator it; | |
request req; | |
response res; | |
}; | |
class web_app: public std::enable_shared_from_this<web_app> { | |
public: | |
static bool key_less(const request_key &a, const request_key &b) { | |
std::size_t a_size, b_size; | |
int cmp; | |
if (a.type == route_type::REQUEST) { | |
if (b.type == route_type::REQUEST) { | |
return a.uri_.path < b.uri_.path; | |
} | |
else { | |
switch (b.type) { | |
case route_type::PREFIX: | |
#if MATCHING_LOG | |
if (a.uri_.path.size() < a.prefix_length) { | |
std::cerr << a.uri_.path << " a.uri_.path.size() < a.prefix_length " << a.prefix_length << std::endl; | |
} | |
#endif | |
a_size = a.uri_.path.size() - a.prefix_length; | |
b_size = b.uri_.path.size(); | |
if (b_size < a_size) { | |
cmp = a.uri_.path.compare(a.prefix_length, b_size, | |
b.uri_.path, 0, b_size); | |
if (cmp == 0) { | |
if (a.uri_.path[a.prefix_length + b_size] == '/') { | |
// a is a subfolder of b; a matches b | |
return false; // a < b is false | |
} | |
else { | |
// a is not a subfolder of b; a starts with b; a > b | |
return false; // true if a < b | |
} | |
} | |
else { | |
return cmp < 0; // true if a < b | |
} | |
} | |
else if (b_size == a_size) { | |
cmp = a.uri_.path.compare(a.prefix_length, b_size, | |
b.uri_.path, 0, b_size); | |
if (cmp == 0) { | |
return false; // a == b; a matches b | |
} | |
else { | |
return cmp < 0; | |
} | |
} | |
else { // b_size > a_size | |
cmp = a.uri_.path.compare(a.prefix_length, a_size, | |
b.uri_.path, 0, b_size); | |
return cmp < 0; | |
} | |
break; | |
case route_type::PARAMETERIZED_PREFIX: | |
#if MATCHING_LOG | |
if (a.uri_.path.size() < a.prefix_length) { | |
std::cerr << a.uri_.path << " a.uri_.path.size() < a.prefix_length " << a.prefix_length << std::endl; | |
} | |
#endif | |
a_size = a.uri_.path.size() - a.prefix_length; | |
b_size = b.prefix_length; // excluding parameters and preceding / | |
if (b_size < a_size) { | |
cmp = a.uri_.path.compare(a.prefix_length, b_size, | |
b.uri_.path, 0, b_size); | |
if (cmp == 0) { | |
if (a.uri_.path[a.prefix_length + b_size] == '/') { | |
// a is a subfolder of b; a matches b | |
return false; // a < b is false | |
} | |
else { | |
// a is not a subfolder of b; a starts with b; a > b | |
return false; | |
} | |
} | |
else { | |
return cmp < 0; // true if a < b | |
} | |
} | |
else if (b_size == a_size) { | |
cmp = a.uri_.path.compare(a.prefix_length, b_size, | |
b.uri_.path, 0, b_size); | |
if (cmp == 0) { | |
return false; // a == b; a matches b | |
} | |
else { | |
return cmp < 0; | |
} | |
} | |
else { // b_size > a_size | |
cmp = a.uri_.path.compare(a.prefix_length, a_size, | |
b.uri_.path, 0, b_size); | |
return cmp < 0; | |
} | |
break; | |
case route_type::REGEX: // unexpected; any < re | |
case route_type::ALL: // unexpected; any < * | |
default: | |
return true; | |
} | |
} | |
} | |
else if (b.type == route_type::REQUEST) { // a.type != REQUEST | |
switch (a.type) { | |
case route_type::PREFIX: | |
#if MATCHING_LOG | |
if (b.uri_.path.size() < b.prefix_length) { | |
std::cerr << b.uri_.path << " b.uri_.path.size() < b.prefix_length " << b.prefix_length << std::endl; | |
} | |
#endif | |
a_size = a.uri_.path.size(); | |
b_size = b.uri_.path.size() - b.prefix_length; | |
if (a_size < b_size) { | |
cmp = b.uri_.path.compare(b.prefix_length, a_size, | |
a.uri_.path, 0, a_size); | |
if (cmp == 0) { | |
if (b.uri_.path[b.prefix_length + a_size] == '/') { | |
// b is a subfolder of a; b matches a | |
return false; // a < b is false | |
} | |
else { | |
// b is not a subfolder of a; b starts with a; a < b | |
return true; // true if a < b | |
} | |
} | |
else { | |
return cmp > 0; // true if a < b | |
} | |
} | |
else if (b_size == a_size) { | |
cmp = b.uri_.path.compare(b.prefix_length, a_size, | |
a.uri_.path, 0, a_size); | |
if (cmp == 0) { | |
return false; // a == b; b matches a | |
} | |
else { | |
return cmp > 0; | |
} | |
} | |
else { // a_size > b_size | |
cmp = b.uri_.path.compare(b.prefix_length, a_size, | |
a.uri_.path, 0, a_size); | |
return cmp > 0; | |
} | |
break; | |
case route_type::PARAMETERIZED_PREFIX: | |
#if MATCHING_LOG | |
if (b.uri_.path.size() < b.prefix_length) { | |
std::cerr << b.uri_.path << " b.uri_.path.size() < b.prefix_length " << b.prefix_length << std::endl; | |
} | |
#endif | |
a_size = a.prefix_length; // excluding parameters and preceding / | |
b_size = b.uri_.path.size() - b.prefix_length; | |
cmp = b.uri_.path.compare(b.prefix_length, a_size, | |
a.uri_.path, 0, a_size); | |
if (a_size < b_size) { | |
if (cmp == 0) { | |
if (b.uri_.path[b.prefix_length + a_size] == '/') { | |
// b is a subfolder of a; b matches a | |
return false; // a < b is false | |
} | |
else { | |
// b is not a subfolder of a; b starts with a; a < b | |
return true; | |
} | |
} | |
else { | |
return cmp > 0; // true if a < b | |
} | |
} | |
else if (a_size == b_size) { | |
if (cmp == 0) { | |
return false; // a == b; a matches b | |
} | |
else { | |
return cmp > 0; | |
} | |
} | |
else { // a_size > b_size | |
return cmp > 0; | |
} | |
break; | |
case route_type::REGEX: // unexpected; any < re | |
case route_type::ALL: // unexpected; any < * | |
default: | |
return false; | |
} | |
} | |
else { // a != REQUEST && b != REQUEST | |
switch (a.type) { | |
case route_type::PREFIX: | |
a_size = a.uri_.path.size(); | |
switch (b.type) { | |
case route_type::PREFIX: | |
b_size = b.uri_.path.size(); | |
cmp = a.uri_.path.compare(0, a_size, | |
b.uri_.path, | |
0, b_size); | |
return cmp < 0; | |
case route_type::PARAMETERIZED_PREFIX: | |
b_size = b.prefix_length; | |
cmp = a.uri_.path.compare(0, a_size, | |
b.uri_.path, | |
0, b_size); | |
return cmp < 0; | |
case route_type::REGEX: // unexpected; any < re | |
case route_type::ALL: // unexpected; any < * | |
default: | |
return true; | |
} | |
case route_type::PARAMETERIZED_PREFIX: | |
a_size = a.prefix_length; | |
switch (b.type) { | |
case route_type::PREFIX: | |
b_size = b.uri_.path.size(); | |
cmp = a.uri_.path.compare(0, a_size, | |
b.uri_.path, | |
0, b_size); | |
return cmp < 0; | |
case route_type::PARAMETERIZED_PREFIX: | |
b_size = b.prefix_length; | |
cmp = a.uri_.path.compare(0, a_size, | |
b.uri_.path, | |
0, b_size); | |
return cmp < 0; | |
case route_type::REGEX: // unexpected: any < re | |
case route_type::ALL: // unexpected: any < * | |
default: | |
return true; | |
} | |
case route_type::REGEX: // unexpected | |
case route_type::ALL: // unexpected | |
default: | |
switch (b.type) { | |
case route_type::PREFIX: // unexpected: any < re/* | |
case route_type::PARAMETERIZED_PREFIX: // unexpected: any < re/* | |
return false; | |
case route_type::REGEX: // unexpected | |
case route_type::ALL: // unexpected | |
default: | |
return a.uri_.path < b.uri_.path; | |
} | |
} | |
} | |
}; | |
static std::shared_ptr<web_app> create() { | |
return std::make_shared<web_app>(); | |
} | |
static std::shared_ptr<web_app> copy(std::shared_ptr<web_app> original) { | |
return std::make_shared<web_app>(original)->copy_app_chain(original); | |
} | |
public: | |
web_app() | |
: type(app_type::APP), | |
method_("*"), | |
path_("*"), | |
app_chain(std::make_shared<std::list<std::shared_ptr<web_app>>>()), | |
parent_() { | |
#if INSTANTIATION_LOG | |
std::cerr << "Constructed web_app " << path_ << std::endl; | |
#endif | |
} | |
web_app(app_type type, std::string method, std::string path, std::shared_ptr<web_app> parent = nullptr) | |
: type(type), | |
method_(method), | |
path_(path), | |
app_chain(std::make_shared<std::list<std::shared_ptr<web_app>>>()), | |
parent_(parent) { | |
#if INSTANTIATION_LOG | |
std::cerr << "Constructed web_app " << path_ << std::endl; | |
#endif | |
} | |
web_app(std::string method, std::string path, middleware_cb cb, std::shared_ptr<web_app> parent = nullptr) | |
: type(app_type::RAW_MIDDLEWARE), | |
method_(method), | |
path_(path), | |
cb(cb), | |
app_chain(std::make_shared<std::list<std::shared_ptr<web_app>>>()), | |
parent_(parent) { | |
#if INSTANTIATION_LOG | |
std::cerr << "Constructed web_app " << path_ << std::endl; | |
#endif | |
} | |
web_app(const std::shared_ptr<web_app> original) | |
: type(original->type), | |
method_(original->method_), | |
path_(original->path_), | |
cb(original->cb), | |
app_chain(), | |
parent_() { | |
#if INSTANTIATION_LOG | |
std::cerr << "Constructed web_app " << path_ << " by copying" << std::endl; | |
#endif | |
} | |
~web_app() { | |
#if INSTANTIATION_LOG | |
std::cerr << "Destructed web_app " << path_ << std::endl; | |
#endif | |
} | |
std::shared_ptr<web_app> ptr() { | |
return shared_from_this(); | |
} | |
std::shared_ptr<web_app> copy_app_chain(const std::shared_ptr<web_app> original) { | |
app_chain = std::make_shared<std::list<std::shared_ptr<web_app>>>(); | |
auto it = original->app_chain->cbegin(); | |
while (it != original->app_chain->cend()) { | |
auto clone = std::make_shared<web_app>(*it); | |
clone->copy_app_chain(*it); | |
clone->parent_ = shared_from_this(); | |
app_chain->push_back(clone); | |
it++; | |
} | |
return shared_from_this(); | |
} | |
std::size_t get_prefix_length() { | |
auto param_it = path_.find("/:"); | |
std::size_t prefix_length__ = 0; | |
if (param_it != std::string::npos) { | |
prefix_length__ = param_it; | |
} | |
else if (path_.find("(") != std::string::npos) { // TODO ( is too ambiguous | |
prefix_length__ = path_.size(); // TODO: | |
} | |
else if (path_ == "*") { | |
prefix_length__ = 0; | |
} | |
else { | |
prefix_length__ = path_.size(); | |
} | |
return prefix_length__; | |
} | |
void set_prefix(std::string parent_prefix) { | |
prefix = parent_prefix; | |
std::string my_prefix; | |
auto it = app_chain->begin(); | |
switch (type) { | |
case app_type::APP: | |
case app_type::DISPATCHER: | |
my_prefix = parent_prefix + path_.substr(0, get_prefix_length()); | |
while (it != app_chain->end()) { | |
(*it)->set_prefix(my_prefix); | |
it++; | |
} | |
break; | |
case app_type::RAW_MIDDLEWARE: | |
default: | |
break; | |
} | |
} | |
operator request_cb() { | |
if (type == app_type::APP) { | |
if (path_.size() > 0 && path_[path_.size() - 1] == '/') { | |
path_ = path_.substr(0, path_.size() - 1); // remove trailing / from the mount point path_ | |
} | |
} | |
set_prefix(path_); | |
auto chain = (std::shared_ptr<middleware_chain>)*this; | |
if (type == app_type::APP) { | |
chain->pop_front(); // pop self app; | |
} | |
auto it = chain->begin(); | |
middleware_index index = 0; | |
while (it != chain->end()) { | |
it->index = index; | |
it->next = it->index + it->next; | |
it++; | |
index++; | |
} | |
struct quadple { | |
app_type type; | |
middleware_index index; | |
middleware_index original_next; | |
middleware_index updated_next; | |
}; | |
std::list<quadple> parent_type_stack; | |
parent_type_stack.push_front(quadple{ app_type::APP, chain->size(), chain->size(), chain->size() }); | |
it = chain->begin(); | |
while (it != chain->end()) { | |
auto next_it = it; | |
next_it++; | |
it->parent_type = parent_type_stack.front().type; | |
it->parent_index = parent_type_stack.front().index; | |
it->original_next = it->next; | |
it->parent_next = parent_type_stack.front().updated_next; | |
std::list<quadple>::iterator parent_type_stack_it; | |
switch (it->parent_type) { | |
case app_type::DISPATCHER: | |
it->next = it->parent_next; | |
parent_type_stack_it = parent_type_stack.begin(); | |
while (parent_type_stack_it != parent_type_stack.end() && parent_type_stack_it->index < chain->size()) { | |
parent_type_stack_it++; | |
if (parent_type_stack_it->index >= it->next) { | |
break; | |
} | |
} | |
break; | |
case app_type::APP: | |
if (next_it == chain->end() || next_it->index >= parent_type_stack.front().original_next) { | |
it->next = it->parent_next; | |
} | |
if (it->next == it->parent_next) { | |
parent_type_stack_it = parent_type_stack.begin(); | |
while (parent_type_stack_it != parent_type_stack.end() && parent_type_stack_it->index < chain->size()) { | |
parent_type_stack_it++; | |
if (parent_type_stack_it->index >= it->next) { | |
break; | |
} | |
} | |
} | |
break; | |
case app_type::RAW_MIDDLEWARE: | |
break; | |
default: | |
break; | |
} | |
switch (it->type) { | |
case app_type::APP: | |
case app_type::DISPATCHER: | |
parent_type_stack.push_front(quadple{ it->type, it->index, it->original_next, it->next }); | |
break; | |
case app_type::RAW_MIDDLEWARE: | |
default: | |
break; | |
} | |
while (parent_type_stack.size() > 0 && it->index + 1 >= parent_type_stack.front().original_next) { | |
parent_type_stack.pop_front(); | |
} | |
it++; | |
} | |
auto vector_ = std::make_shared<middleware_vector>(chain->begin(), chain->end()); | |
for (middleware_index index = 0; index < vector_->size(); index++) { | |
if ((*vector_)[index].parent_index < vector_->size()) { | |
(*vector_)[(*vector_)[index].parent_index].children.push_back(index); | |
} | |
auto param_it = (*vector_)[index].path.find("/:"); | |
std::size_t prefix_length = 0; | |
route_type r_type; | |
if (param_it != std::string::npos) { | |
r_type = route_type::PARAMETERIZED_PREFIX; | |
prefix_length = param_it; | |
} | |
else if ((*vector_)[index].path.find("(") != std::string::npos) { // TODO ( is too ambiguous | |
r_type = route_type::REGEX; | |
} | |
else if ((*vector_)[index].path == "*") { | |
r_type = route_type::ALL; | |
} | |
else { | |
r_type = route_type::PREFIX; | |
} | |
(*vector_)[index].r_type = r_type; | |
(*vector_)[index].prefix_length = prefix_length; | |
} | |
for (middleware_index index = 0; index < vector_->size(); index++) { | |
if ((*vector_)[index].type == app_type::DISPATCHER) { | |
std::for_each((*vector_)[index].children.cbegin(), (*vector_)[index].children.cend(), [vector_, index](middleware_index child_it) { | |
auto method_it = (*vector_)[index].dispatch_map.find((*vector_)[child_it].method); | |
if (method_it == (*vector_)[index].dispatch_map.end()) { | |
std::map<request_key, middleware_index> method_map; | |
(*vector_)[index].dispatch_map.emplace((*vector_)[child_it].method, std::move(method_map)); | |
} | |
method_it = (*vector_)[index].dispatch_map.find((*vector_)[child_it].method); | |
if (method_it == (*vector_)[index].dispatch_map.end()) { | |
std::cerr << "failed to emplace in dispatch_map" << std::endl; | |
return; | |
} | |
std::size_t prefix_length = (*vector_)[child_it].prefix_length; | |
route_type r_type = (*vector_)[child_it].r_type; | |
if (r_type == route_type::PREFIX || r_type == route_type::PARAMETERIZED_PREFIX) { | |
method_it->second.emplace(request_key{ | |
(*vector_)[child_it].method, | |
uri_ref{ | |
"https", | |
"localhost", | |
(*vector_)[child_it].path, | |
(*vector_)[child_it].path, | |
"", | |
"" | |
}, | |
prefix_length, | |
r_type | |
}, | |
(*vector_)[child_it].index | |
); | |
} | |
else { | |
// TODO: ALL and REGEX | |
} | |
}); | |
auto all_method_it = (*vector_)[index].dispatch_map.find("*"); | |
if (all_method_it != (*vector_)[index].dispatch_map.end()) { | |
std::for_each(all_method_it->second.begin(), all_method_it->second.end(), [vector_, index](std::pair<request_key, middleware_index> child_it) { | |
std::for_each((*vector_)[index].dispatch_map.begin(), (*vector_)[index].dispatch_map.end(), | |
[vector_, index, &child_it](std::pair<std::string, std::map<request_key, middleware_index>> method_it_){ | |
if (method_it_.first != "*") { | |
//std::cerr << "emplacing " << child_it.first.uri_.path << " into " << method_it_.first << std::endl; | |
auto method_it__ = (*vector_)[index].dispatch_map.find(method_it_.first); | |
method_it__->second.emplace(request_key{ | |
method_it_.first, | |
child_it.first.uri_, | |
child_it.first.prefix_length, | |
child_it.first.type | |
}, | |
child_it.second | |
); | |
} | |
}); | |
}); | |
} | |
} | |
} | |
std::ostringstream oss; | |
std::for_each(vector_->cbegin(), vector_->cend(), [&oss](middleware_item item) { | |
oss << "middleware_item { index: " << item.index | |
<< ", method: " << item.method | |
<< ", prefix: " << item.prefix | |
<< ", path: " << item.path | |
<< ", next: " << item.next | |
<< ", type:" << [](app_type type)->std::string { | |
switch (type) { | |
case app_type::APP: return "APP"; | |
case app_type::DISPATCHER: return "DISPATCHER"; | |
case app_type::RAW_MIDDLEWARE: return "RAW_MIDDLEWARE"; | |
default: return std::to_string((int)type); | |
}}(item.type) | |
<< ", parent_type:" << [](app_type type)->std::string { | |
switch (type) { | |
case app_type::APP: return "APP"; | |
case app_type::DISPATCHER: return "DISPATCHER"; | |
case app_type::RAW_MIDDLEWARE: return "RAW_MIDDLEWARE"; | |
default: return std::to_string((int)type); | |
}}(item.parent_type) | |
<< ", parent_index: " << item.parent_index | |
<< ", parent_next: " << item.parent_next | |
<< ", children "; | |
bool first = true; | |
oss << "[ "; | |
std::for_each(item.children.cbegin(), item.children.cend(), [&first, &oss](middleware_index child){ | |
if (first) { | |
first = false; | |
} | |
else { | |
oss << ", "; | |
} | |
oss << child; | |
}); | |
oss << " ] "; | |
first = true; | |
oss << ", dispatch_map [ "; | |
std::for_each(item.dispatch_map.cbegin(), item.dispatch_map.cend(), [&first, &oss](std::pair<std::string, std::map<request_key, middleware_index>> method_it){ | |
if (first) { | |
first = false; | |
} | |
else { | |
oss << ", "; | |
} | |
oss << method_it.first << ": [ "; | |
auto first_ = true; | |
std::for_each(method_it.second.cbegin(), method_it.second.cend(), [&first_, &oss](std::pair<request_key, middleware_index> route_it){ | |
if (first_) { | |
first_ = false; | |
} | |
else { | |
oss << ", "; | |
} | |
oss << route_it.first.uri_.path; | |
}); | |
oss << " ] "; | |
}); | |
oss << " ] "; | |
oss << "}" << std::endl; | |
}); | |
#ifdef DUMP_MIDDLEWARE_ITEM | |
std::cerr << oss.str(); | |
#endif | |
return middleware_controller::handler(vector_); | |
} | |
operator std::shared_ptr<middleware_chain>() { | |
std::shared_ptr<middleware_chain> chain; | |
switch (type) { | |
case app_type::RAW_MIDDLEWARE: | |
chain = std::make_shared<middleware_chain>(); | |
chain->push_back(middleware_item{ 0, prefix, method_, path_, generate_raw_middleware(), 1, type }); | |
return chain; | |
case app_type::DISPATCHER: | |
return generate_dispatch_middleware(); | |
case app_type::APP: | |
return generate_app_middleware(); | |
default: | |
chain = nullptr; | |
return chain; | |
} | |
} | |
std::shared_ptr<middleware_chain> generate_app_middleware() { | |
auto chain = std::make_shared<middleware_chain>(); | |
auto it = app_chain->cbegin(); | |
auto cend = app_chain->cend(); | |
middleware_cb app_cb = [this](request &req, response &res){ | |
if (req.match()) { | |
#if MATCHING_LOG | |
std::cerr << "app_cb " << "req " << req.uri().path << " matched " << req.controller->it->index << ":" << req.controller->it->path << " with prefix " << req.uri().path.substr(0, req.key.prefix_length) << " " << req.key.prefix_length << std::endl; | |
#endif | |
req.next(0, next_type::FIRST_CHILD); | |
} | |
else { | |
#if MATCHING_LOG | |
std::cerr << "app_cb " << "req " << req.uri().path << " did not match " << req.controller->it->index << ":" << req.controller->it->path << " " << req.key.prefix_length << std::endl; | |
#endif | |
req.next(); | |
} | |
}; | |
cb = app_cb; | |
while (it != cend) { | |
auto subchain = (std::shared_ptr<middleware_chain>)(**it); | |
for (auto it_sub = subchain->cbegin(); it_sub != subchain->cend(); it_sub++) { | |
chain->push_back(*it_sub); | |
} | |
it++; | |
} | |
chain->push_front(middleware_item{ 0, prefix, method_, path_, cb, chain->size() + 1 /* add self */, type }); | |
return chain; | |
} | |
std::shared_ptr<middleware_chain> generate_dispatch_middleware() { | |
auto chain = std::make_shared<middleware_chain>(); | |
auto it = app_chain->cbegin(); | |
auto cend = app_chain->cend(); | |
middleware_cb dispatch_cb = [this](request &req, response &res){ | |
if (req.match()) { | |
req.key.prefix_length = (*req.controller->request_filters)[req.controller->it->index + 1].prefix.size(); | |
#if MATCHING_LOG | |
std::cerr << "dispatch_cb " << "req " << req.uri().path << " matched " << req.controller->it->index << ":" << req.controller->it->path << " with prefix " << req.uri().path.substr(0, req.key.prefix_length) << " " << req.key.prefix_length << std::endl; | |
#endif | |
req.next(0, next_type::MATCHING_CHILD); | |
} | |
else { | |
#if MATCHING_LOG | |
std::cerr << "dispatch_cb " << "req " << req.uri().path << " did not match " << req.controller->it->index << ":" << req.controller->it->path << " " << req.key.prefix_length << std::endl; | |
#endif | |
req.next(); | |
} | |
}; | |
cb = dispatch_cb; | |
while (it != cend) { | |
auto subchain = (std::shared_ptr<middleware_chain>)(**it); | |
for (auto it_sub = subchain->cbegin(); it_sub != subchain->cend(); it_sub++) { | |
chain->push_back(*it_sub); | |
} | |
it++; | |
} | |
chain->push_front(middleware_item{ 0, prefix, method_, path_, cb, chain->size() + 1 /* add self */, type }); | |
return chain; | |
} | |
middleware_cb generate_raw_middleware() { | |
if (false && method_ == "*" && path_ == "*") { | |
return cb; | |
} | |
else { | |
return [this](request &req, response &res) { | |
if (req.match()) { // req.is_matched is reset, req.key.prefix_length is updated | |
#if MATCHING_LOG | |
std::cerr << "raw_middleware_cb " << "req " << req.uri().path << " matched " << req.controller->it->index << ":" << req.controller->it->path << " with prefix " << req.uri().path.substr(0, req.key.prefix_length) << " " << req.key.prefix_length << std::endl; | |
#endif | |
this->cb(req, res); | |
} | |
else { | |
#if MATCHING_LOG | |
std::cerr << "raw_middleware_cb " << "req " << req.uri().path << " did not match " << req.controller->it->index << ":" << req.controller->it->path << " " << req.key.prefix_length << std::endl; | |
#endif | |
req.next(); | |
} | |
}; | |
} | |
} | |
web_app &use(std::shared_ptr<web_app> app) { | |
app->parent_ = shared_from_this(); | |
app_chain->push_back(app); | |
return *this; | |
} | |
web_app &use(middleware_cb cb) { | |
auto app = std::make_shared<web_app>("*", "*", cb); | |
app->parent_ = shared_from_this(); | |
app_chain->push_back(app); | |
return *this; | |
} | |
web_app &use(std::string path, std::shared_ptr<web_app> app) { | |
app->path_ = path; | |
app->parent_ = shared_from_this(); | |
app_chain->push_back(app); | |
return *this; | |
} | |
web_app &use(std::string path, middleware_cb cb) { | |
auto app = std::make_shared<web_app>("*", path, cb); | |
app->parent_ = shared_from_this(); | |
app_chain->push_back(app); | |
return *this; | |
} | |
web_app &all(middleware_cb cb) { | |
return use(cb); | |
} | |
web_app &all(std::string path = "*") { | |
auto app = std::make_shared<web_app>(app_type::APP, "*", path); | |
app->parent_ = shared_from_this(); | |
app_chain->push_back(app); | |
return *app; | |
} | |
web_app &all(std::shared_ptr<web_app> app) { | |
return use(app); | |
} | |
web_app &all(std::string path, middleware_cb cb) { | |
return use(path, cb); | |
} | |
web_app &all(std::string path, std::shared_ptr<web_app> app) { | |
return use(path, app); | |
} | |
web_app &get(std::string path, std::shared_ptr<web_app> app) { | |
app->method_ = "GET"; | |
app->path_ = path; | |
app->parent_ = shared_from_this(); | |
app_chain->push_back(app); | |
return *this; | |
} | |
web_app &get(std::string path, middleware_cb cb) { | |
auto app = std::make_shared<web_app>("GET", path, cb); | |
app->parent_ = shared_from_this(); | |
app_chain->push_back(app); | |
return *this; | |
} | |
web_app &get(std::string path = "*") { | |
auto app = std::make_shared<web_app>(app_type::APP, "GET", path); | |
app->parent_ = shared_from_this(); | |
app_chain->push_back(app); | |
return *app; | |
} | |
web_app &post(std::string path, std::shared_ptr<web_app> app) { | |
app->method_ = "POST"; | |
app->path_ = path; | |
app->parent_ = shared_from_this(); | |
app_chain->push_back(app); | |
return *this; | |
} | |
web_app &post(std::string path, middleware_cb cb) { | |
auto app = std::make_shared<web_app>("POST", path, cb); | |
app->parent_ = shared_from_this(); | |
app_chain->push_back(app); | |
return *this; | |
} | |
web_app &post(std::string path = "*") { | |
auto app = std::make_shared<web_app>(app_type::APP, "POST", path); | |
app->parent_ = shared_from_this(); | |
app_chain->push_back(app); | |
return *app; | |
} | |
web_app &dispatch(std::string path = "*") { | |
auto app = std::make_shared<web_app>(app_type::DISPATCHER, "*", path); | |
app->parent_ = shared_from_this(); | |
app_chain->push_back(app); | |
return *app; | |
} | |
web_app &parent() { | |
if (!parent_.expired()) { | |
auto parent_ptr = parent_.lock(); | |
return *parent_ptr; | |
} | |
else { | |
return *this; // no parent | |
} | |
} | |
web_app &mount(std::string mount_point, http2 &server) { | |
path_ = mount_point; | |
server.handle(path_, (request_cb)*this); | |
return *this; | |
} | |
std::string path() { | |
return path_; | |
} | |
public: | |
app_type type; | |
std::string path_; | |
std::string method_; | |
middleware_cb cb; | |
std::shared_ptr<std::list<std::shared_ptr<web_app>>> app_chain; | |
std::string prefix; | |
std::weak_ptr<web_app> parent_; | |
}; | |
inline void request::next(uint32_t err, next_type type) { | |
controller->next(err, type); | |
} | |
inline bool request::match() { // side effects: update req.key.prefix_length on matching | |
if (is_matched) { | |
#if MATCHING_LOG | |
std::cerr << key.uri_.path << " request::match() resetting is_matched = true as false; prefix_length = " << key.prefix_length << ":" << key.uri_.path.substr(0, key.prefix_length) << std::endl; | |
#endif | |
is_matched = false; // reset is_matched flag | |
return true; // matched on dispatching | |
} | |
else { | |
const middleware_item &item = *(controller->it); | |
if (item.method == "*" || item.method == key.method_) { | |
// method matched | |
const std::string &route = item.path; | |
const std::string &path = key.uri_.path; | |
std::size_t route_size, key_size; | |
switch (item.r_type) { | |
case route_type::ALL: // * | |
// no req.key.prefix_length update | |
return true; | |
case route_type::PREFIX: // /routepath | |
#if MATCHING_LOG | |
if (path.size() < key.prefix_length) { | |
std::cerr << path << " " << "path.size() < key.prefix_length" << " prefix_length = " << key.prefix_length << std::endl; | |
} | |
#endif | |
key_size = path.size() - key.prefix_length; | |
route_size = route.size(); | |
if (route_size > key_size) { | |
return false; // /shortpath never matches /long/routepath | |
} | |
else { | |
// route_size <= key_size | |
if (path.compare(key.prefix_length, route_size, route, 0, route_size) == 0) { | |
if (route_size == key_size) { | |
// exact match | |
switch (item.type) { | |
case app_type::APP: | |
case app_type::DISPATCHER: | |
key.prefix_length += route_size; | |
break; | |
case app_type::RAW_MIDDLEWARE: | |
default: | |
key.prefix_length += route_size; | |
break; | |
} | |
return true; | |
} | |
else { | |
// route_size < key_size | |
if (path[key.prefix_length + route_size] == '/') { | |
// path is a subfolder of route; path matches route | |
switch (item.type) { | |
case app_type::APP: | |
case app_type::DISPATCHER: | |
key.prefix_length += route_size; | |
break; | |
case app_type::RAW_MIDDLEWARE: | |
default: | |
key.prefix_length += route_size; | |
break; | |
} | |
return true; | |
} | |
else { | |
// path is not a subfolder of route; path starts with route; path does not match route | |
return false; | |
} | |
} | |
} | |
else { | |
return false; // path does not start with route | |
} | |
} | |
break; | |
case route_type::PARAMETERIZED_PREFIX: // /routepath/:param1/:param2 | |
key_size = path.size() - key.prefix_length; | |
route_size = item.prefix_length; | |
if (route_size > key_size) { | |
return false; // /shortpath never matches /long/routepath | |
} | |
else { | |
// route_size <= key_size | |
if (path.compare(key.prefix_length, route_size, route, 0, route_size) == 0) { | |
if (route_size == key_size) { | |
// missing parameters | |
return false; | |
} | |
else { | |
// route_size < key_size | |
if (path[key.prefix_length + route_size] == '/') { | |
// path is a subfolder of route; path matches route | |
// parameters start at path + key.prefix_length + route_size + 1 | |
switch (item.type) { | |
case app_type::APP: | |
case app_type::DISPATCHER: | |
key.prefix_length += route_size; // points at '/' followed by parameters | |
break; | |
case app_type::RAW_MIDDLEWARE: | |
default: | |
key.prefix_length += route_size; | |
break; | |
} | |
return true; | |
} | |
else { | |
// path is not a subfolder of route; path starts with route; path does not match route | |
return false; | |
} | |
} | |
} | |
else { | |
// path does not start with route | |
return false; | |
} | |
} | |
break; | |
case route_type::REGEX: // TODO | |
return false; | |
case route_type::REQUEST: // unexpected route_type in item | |
default: | |
return false; | |
} | |
} | |
else { | |
return false; // method not matched | |
} | |
} | |
} | |
inline void response::write_head(unsigned int status_code, header_map h) { | |
status_code_ = status_code; | |
header_ = std::move(h); | |
if (!expect_response_body(controller->req.method(), status_code_)) { | |
is_header_written = true; | |
res.write_head(status_code_, header_); | |
} | |
else { | |
// calling res.write_head() is pending | |
} | |
} | |
inline void response::end(std::string data) { | |
#if STREAMING_SUPPORT | |
#if STREAMING_LOG | |
std::cerr << "[response::end(std::string data)]: response_filters.size() = " << response_filters.size() << std::endl; | |
#endif | |
if (!force_buffered_filtering && response_filters.size() > 0) { | |
// streaming | |
#if STREAMING_LOG | |
std::cerr << "[response::end(std::string data)] streaming with data = " << data << std::endl; | |
#endif | |
auto left_ = std::make_shared<std::size_t>(data.size()); | |
auto data_ = std::make_shared<std::string>(std::move(data)); | |
end([data_, left_](uint8_t *buf, size_t len, uint32_t *data_flags) { | |
auto bytes = std::min(len, *left_); | |
std::copy_n(data_->data() + data_->size() - *left_, bytes, buf); | |
*left_ -= bytes; | |
if (*left_ == 0) { | |
*data_flags |= NGHTTP2_DATA_FLAG_EOF; | |
} | |
return bytes; | |
}); | |
return; | |
} | |
#if STREAMING_LOG | |
std::cerr << "[response::end(std::string data)] non-streaming with data = " << data << std::endl; | |
#endif | |
#endif // STREAMING_SUPPORT | |
body = data; | |
respond_body = [](response &res){ | |
if (!res.is_header_written) { | |
res.is_header_written = true; | |
#if STREAMING_LOG | |
std::cerr << "[response::end(std::string data) respond_body callback] res.res.write_head(); status_code = " << res.status_code() << std::endl; | |
#endif | |
res.res.write_head(res.status_code(), res.header()); | |
} | |
#if STREAMING_LOG | |
std::cerr << "[response::end(std::string data) respond_body callback] res.res.end(res.body); res.body = " << res.body << std::endl; | |
#endif | |
res.res.end(res.body); | |
}; | |
#if STREAMING_SUPPORT | |
it = response_filters.begin(); | |
if (it != response_filters.end()) { | |
#if STREAMING_LOG | |
std::cerr << "[response::end(std::string data)] calling response_filters " << std::endl; | |
#endif | |
if (!it->buffered) { | |
it->buffered = wrap_stream_cb(it->streamed, it->on_header); | |
} | |
(it->buffered)(*this); | |
} | |
#else // !STREAMING_SUPPORT | |
it = response_filters.cbegin(); | |
if (it != response_filters.cend()) { | |
(*it)(*this); | |
} | |
#endif // !STREAMING_SUPPORT | |
else { | |
#if STREAMING_LOG | |
std::cerr << "[response::end(std::string data)] calling respond_body " << std::endl; | |
#endif | |
respond_body(*this); | |
} | |
} | |
inline response_cb response::start_response() { | |
return [](response &res){ | |
#if STREAMING_LOG | |
std::cerr << "[response::start_response() callback] called" << std::endl; | |
#endif | |
if (!res.is_header_written) { | |
res.is_header_written = true; | |
res.res.write_head(res.status_code(), res.header()); | |
} | |
auto left_bytes = std::make_shared<std::size_t>(res.body.size()); | |
res.res.end([&res, left_bytes](uint8_t *buf, std::size_t len, uint32_t *data_flags) { | |
auto written_bytes = std::min(len, *left_bytes); | |
std::copy_n(res.body.data() + res.body.size() - *left_bytes, written_bytes, buf); | |
*left_bytes -= written_bytes; | |
if (*left_bytes <= 0) { | |
*data_flags |= NGHTTP2_DATA_FLAG_EOF; | |
if (res.is_trailer_expected) { | |
*data_flags |= NGHTTP2_DATA_FLAG_NO_END_STREAM; | |
res.do_write_trailer(); | |
} | |
} | |
return written_bytes; | |
}); | |
}; | |
} | |
#if STREAMING_SUPPORT | |
inline void response::on_response(response_cb cb) { | |
#if STREAMING_LOG | |
std::cerr << "[response::on_response(response_cb)]: response_filters.size() = " << response_filters.size() << std::endl; | |
std::cerr << "[response::on_response(response_cb)]: response_filters.push_front(filter_item{index, cb})" << std::endl; | |
#endif | |
response_filters.push_front(std::move(filter_item{this->controller->it->index, cb})); | |
force_buffered_filtering = true; | |
} | |
inline void response::on_response(response_cb on_header, stream_cb cb) { | |
#if STREAMING_LOG | |
std::cerr << "[response::on_response(stream_cb)]: response_filters.size() = " << response_filters.size() << std::endl; | |
std::cerr << "[response::on_response(stream_cb)]: response_filters.push_front(filter_item{index, cb, on_header})" << std::endl; | |
#endif | |
response_filters.push_front(std::move(filter_item{this->controller->it->index, cb, on_header})); | |
} | |
#endif // STREAMING_SUPPORT | |
inline void response::setup_on_close() { | |
res.on_close([this](uint32_t err){ | |
for (auto cb: on_close_callbacks) { | |
cb(err); // close callbacks are called in the reversed order of their registrations | |
} | |
if (this->controller) { | |
if (!this->is_push) { | |
this->controller->req.controller = nullptr; | |
} | |
this->controller = nullptr; | |
} | |
}); | |
} | |
inline bool operator<(const request_key &a, const request_key &b) { | |
return web_app::key_less(a, b); | |
} | |
#if STREAMING_SUPPORT | |
#if GENERATOR_STREAM | |
class generator_stream; | |
/* | |
Helper classes for streaming filters: | |
Notes on streaming architecture: | |
- generator_cb (std::function<ssize_t (uint8_t *buf, std::size_t len, uint32_t *data_flags)>) as data source and sink | |
- Wrapped as generator_cb to be consumed by sink like this (in a rough conceptual depiction): | |
generator_cb sink() { return filter(generator_cb source); } - nested calls to source generator_cb | |
generator_stream::filter(buf, len, data_flags) can serve as this filter() function above | |
- The source end is cb in res.end(generator_cb cb) | |
- The sink end is the nghttp2 engine's call to generator_cb after res.end(generator_cb) | |
nghttp2 sink -(call)-> filterN -> filterN-1 -> ... -> filter1 -> source generator_cb | |
HTTP2 engine <=(data)= filterN <= filterN-1 <= ... <= filter1 <= data from generator_cb | |
- Back-pressure from sink is achieved by just skipping calls to the filterN callback | |
- NGHTTP2_ERR_DEFERRED can propagate from source through sink with latency | |
- Latency of deferred status propagation is controlled by wrapping filters in backyard (see response::end()) | |
class generator_stream: std::iostream on generator_streambuf | |
- Option #1: Class can be derived to create a custom filter stream with overridden do_*() methods | |
- Option #1-2: Use boost_filtering_streambuf_adaptor (derived class defined below) as an adaptor | |
- Option #1-3: Add callback lambdas as member variables and call them from do_*() in a derived class, | |
which should be conceptually the same as class derivation (with more overheads) | |
- Option #2: Methods (readsome(), check_status(), ...) can be called for input buffering | |
- If no resizing nor output buffering is required, this can be the most efficient scheme | |
- Option #3: Do not use generator_stream if no buffering nor resizing is required in filtering | |
- Input buffer == Output buffer == Filtering workspace | |
class generator_streambuf: std::streambuf buffer for generator_stream | |
- This streambuf is customized in a non-standard way to call underflow() prematurely | |
to fill the input buffer as much as possible non-blockingly | |
methods: readsome() - read data from input buffer non-blockingly | |
can be read to [buf, len) (output buffer) directly | |
check_status(data_flags) - returns nghttp2 error code if required | |
sink <= |output buffer| <= (filter) <= |input buffer| <= source | |
write() do_peek() - peek input buffer for a next chunk (filterable unit) | |
operator << do_filter() - filter a chunk and write to output buffer | |
do_close() - flush filter buffer (if exists) to output buffer on EOF | |
override do_*() to build a custom filter | |
Input buffer: | |
_M_gbuf (std::string) | |
|read |buffered |empty | | |
pback() gptr() egptr() _M_gbuf[_M_gbuf.size() - 1] | |
^ ^ | |
read from here fill from source (generator_cb) | |
Output buffer: | |
non-overflowing mode: output buffer [buf, buf + len) is being filled | |
generator_cb buffer overflow buffer | |
|written | | + | | | |
buf pptr() buf+len _M_pbuf_overflow (std::string) | |
=_M_pbuf ^ | |
=pbase() write from here | |
overflowing mode: [buf, buf + len) is returned to the caller sink (generator_cb) on return | |
generator_cb buffer overflow buffer (expands on demand) | |
|written (filled) | + |written | empty | | |
buf=_M_pbuf buf+len _M_pbuf_overflow (std::string) | |
^ ^ ^ | |
pbase() pptr() epptr() | |
write from here | |
overflowing mode before setbuf() and after flush(): | |
_M_pbuf = nullptr | |
overflow buffer (expands on demand) | |
|written | empty | | |
_M_pbuf_overflow (std::string) | |
^ ^ ^ | |
pbase() pptr() epptr() | |
write from here | |
*/ | |
class generator_streambuf: public std::streambuf { | |
public: | |
friend class generator_stream; | |
// for reading and writing | |
generator_streambuf(generator_cb generator, std::streamsize buf_size = 16384) | |
: _M_mode(std::ios::in | std::ios::out), | |
_M_generator(generator), | |
_M_buf_size(buf_size), | |
_M_state(STATE_INITIAL), | |
_M_gbuf(buf_size, '\0'), | |
_M_pbuf(nullptr), | |
_M_pbuf_size(0), | |
_M_pbuf_overflow(buf_size, '\0') { | |
#if STREAMING_LOG | |
std::cerr << "[generator_streambuf() for reading and writing] buf_size = " << buf_size << std::endl; | |
std::cerr << "[generator_streambuf() for reading and writing] _M_gbuf.size() = " << _M_gbuf.size() << std::endl; | |
#endif | |
auto gbuf_begin = &_M_gbuf[0]; | |
auto gbuf_end = gbuf_begin + _M_gbuf.size(); | |
setg(gbuf_begin, gbuf_end, gbuf_end); // underflow state | |
#if STREAMING_LOG | |
std::cerr << "[generator_streambuf() for reading and writing] overflow buf_size = " << buf_size << std::endl; | |
#endif | |
setp(&_M_pbuf_overflow[0], &_M_pbuf_overflow[0] + _M_pbuf_overflow.size()); // empty | |
} | |
typedef char *char_ptr; | |
std::string &peek(char_ptr &_gptr, char_ptr &_egptr) { | |
_gptr = gptr(); | |
_egptr = egptr(); | |
return _M_gbuf; | |
} | |
std::streamsize buf_size() { | |
return _M_buf_size; | |
} | |
bool is_overflowing() { | |
return pbase() == &_M_pbuf_overflow[0]; | |
} | |
std::streamsize out_avail() { // excluding overflow buffer | |
if (is_overflowing()) { | |
return _M_pbuf_size; | |
} | |
else { | |
return pptr() - pbase(); | |
} | |
} | |
std::streamsize out_avail_including_overflow() { // including overflow buffer | |
if (is_overflowing()) { | |
return _M_pbuf_size + (pptr() - pbase()); | |
} | |
else { | |
return pptr() - pbase(); | |
} | |
} | |
enum generator_state { | |
STATE_INITIAL, | |
STATE_ERROR, | |
STATE_EOF, | |
STATE_OPEN, | |
STATE_DEFER | |
}; | |
generator_state state() { | |
return _M_state; | |
} | |
protected: | |
generator_cb _M_generator; | |
std::streamsize _M_buf_size; | |
std::ios_base::openmode _M_mode; | |
generator_state _M_state; | |
std::string _M_gbuf; | |
char *_M_pbuf; // not the owner of the buffer; possibly pointing within _M_gbuf.data() of the next generator_streambuf object in the filter chain | |
std::streamsize _M_pbuf_size; | |
std::string _M_pbuf_overflow; | |
virtual std::streampos seekoff( | |
std::streamoff off, | |
std::ios_base::seekdir way, | |
std::ios_base::openmode which = std::ios_base::in | std::ios_base::out) { | |
if (which & std::ios_base::in) { | |
if (way == std::ios_base::cur) { | |
auto next_gptr = gptr() + off; | |
if (next_gptr <= egptr()) { | |
setg(eback(), next_gptr, egptr()); | |
return gptr() - eback(); // dummy offset value | |
} | |
} | |
} | |
if (which & std::ios_base::out) { | |
// not implemented | |
} | |
return (pos_type(off_type(EOF))); | |
} | |
virtual std::streamsize showmanyc() { | |
std::streamsize _ret = 0; | |
switch (_M_state) { | |
case STATE_ERROR: | |
_ret = -1; | |
break; | |
case STATE_EOF: | |
_ret = egptr() - gptr(); | |
if (_ret <= 0) { | |
_ret = -1; | |
} | |
break; | |
case STATE_INITIAL: | |
case STATE_OPEN: | |
underflow(); // non-standard way of buffering to handle non-blocking I/O for generator | |
switch (_M_state) { | |
case STATE_ERROR: | |
_ret = -1; | |
break; | |
case STATE_EOF: | |
_ret = egptr() - gptr(); | |
if (_ret <= 0) { | |
_ret = -1; | |
} | |
break; | |
case STATE_INITIAL: | |
case STATE_OPEN: | |
_ret = egptr() - gptr(); | |
break; | |
case STATE_DEFER: | |
_ret = egptr() - gptr(); | |
if (_ret == 0) { | |
_M_state = STATE_OPEN; | |
} | |
break; | |
default: | |
_ret = 0; | |
break; | |
} | |
break; | |
case STATE_DEFER: | |
_ret = egptr() - gptr(); | |
if (_ret == 0) { | |
_M_state = STATE_OPEN; | |
} | |
break; | |
default: | |
_ret = 0; | |
break; | |
} | |
#if STREAMING_LOG | |
std::cerr << "[generator_streambuf::showmanyc()] _ret = " << _ret << std::endl; | |
#endif | |
return _ret; | |
} | |
virtual int_type underflow() { | |
#if STREAMING_LOG | |
std::cerr << "[generator_streambuf::underflow()] " << std::endl; | |
std::string st; | |
switch (_M_state) { | |
case STATE_INITIAL: st = "INITIAL"; break; | |
case STATE_ERROR: st = "ERROR"; break; | |
case STATE_EOF: st = "EOF"; break; | |
case STATE_OPEN: st = "OPEN"; break; | |
case STATE_DEFER: st = "DEFER"; break; | |
default: st = "unknown"; break; | |
} | |
std::cerr << "[generator_streambuf::underflow()] _M_state = " << st << std::endl; | |
#endif | |
int_type _ret = traits_type::eof(); | |
if (_M_state == STATE_ERROR) { | |
return _ret; | |
} | |
auto curr_ptr = gptr(); | |
if (_M_state == STATE_EOF) { | |
if (!curr_ptr || curr_ptr >= egptr()) { | |
// no more readable data in buffer | |
#if STREAMING_LOG | |
std::cerr << "[generator_streambuf::underflow()] no more readable data in buffer " << std::endl; | |
#endif | |
_ret = traits_type::eof(); | |
} | |
else { | |
#if STREAMING_LOG | |
std::cerr << "[generator_streambuf::underflow()] readable data remain in buffer : " << (egptr() - curr_ptr) << std::endl; | |
#endif | |
_ret = traits_type::to_int_type(*curr_ptr); // readable data remain in buffer | |
} | |
} | |
else { | |
auto gbuf_begin = &_M_gbuf[0]; | |
auto gbuf_end = gbuf_begin + _M_gbuf.size(); | |
if (!curr_ptr || curr_ptr >= gbuf_begin) { | |
if (!curr_ptr) { | |
curr_ptr = gbuf_begin; | |
setg(gbuf_begin, curr_ptr, curr_ptr); | |
} | |
if (gbuf_end <= curr_ptr) { | |
curr_ptr = gbuf_begin; // rewind the read ptr | |
setg(gbuf_begin, curr_ptr, curr_ptr); | |
} | |
else if (gbuf_begin < curr_ptr) { | |
auto bytes_in_buffer = egptr() - curr_ptr; | |
std::copy_n(curr_ptr, bytes_in_buffer, gbuf_begin); // move the remaining bytes to gbuf_begin | |
curr_ptr = gbuf_begin; | |
setg(gbuf_begin, curr_ptr, curr_ptr + bytes_in_buffer); | |
#if STREAMING_LOG | |
std::cerr << "[generator_streambuf::underflow()] move remaining bytes to gbuf_begin; bytes = " << bytes_in_buffer << std::endl; | |
#endif | |
} | |
#if STREAMING_LOG | |
if (_M_state == STATE_DEFER) | |
std::cerr << "[generator_streambuf::underflow()] state updated from DEFER to OPEN " << std::endl; | |
#endif | |
_M_state = STATE_OPEN; | |
std::size_t buf_rest = gbuf_end - egptr(); | |
#if STREAMING_LOG | |
std::cerr << "[generator_streambuf::underflow()] _M_gbuf.size() = " << _M_gbuf.size() << std::endl; | |
std::cerr << "[generator_streambuf::underflow()] buf_rest = " << buf_rest << std::endl; | |
#endif | |
uint32_t data_flags = 0; | |
ssize_t bytes; | |
auto append_ptr = egptr(); | |
// non-blockingly fill buffer from generator | |
while (buf_rest > 0 && (bytes = _M_generator((uint8_t *)append_ptr, buf_rest, &data_flags)) > 0) { | |
#if STREAMING_LOG | |
std::cerr << "[generator_streambuf::underflow()] _M_generator bytes = " << bytes << std::endl; | |
std::cerr << "[generator_streambuf::underflow()] _M_generator buf_rest = " << buf_rest << std::endl; | |
#endif | |
append_ptr += bytes; | |
buf_rest -= bytes; | |
setg(gbuf_begin, curr_ptr, append_ptr); | |
data_flags = 0; | |
#if STREAMING_LOG | |
std::cerr << "[generator_streambuf::underflow()] append_ptr - curr_ptr = " << (append_ptr - curr_ptr) << std::endl; | |
#endif | |
} | |
if (bytes == 0 || buf_rest == 0) { | |
if (data_flags & NGHTTP2_DATA_FLAG_EOF) { | |
// EOF | |
#if STREAMING_LOG | |
std::cerr << "[generator_streambuf::underflow()] EOF data_flags = " << data_flags << std::endl; | |
#endif | |
_M_state = STATE_EOF; | |
} | |
auto readable_bytes = append_ptr - curr_ptr; | |
#if STREAMING_LOG | |
std::cerr << "[generator_streambuf::underflow()] out of generator read loop; append_ptr - curr_ptr = " << readable_bytes << std::endl; | |
#endif | |
setg(gbuf_begin, curr_ptr, append_ptr); | |
if (readable_bytes > 0) { | |
_ret = traits_type::to_int_type(*curr_ptr); | |
} | |
} | |
else if (bytes < 0) { | |
if (bytes == NGHTTP2_ERR_DEFERRED) { | |
if (append_ptr - curr_ptr > 0) { | |
_ret = traits_type::to_int_type(*curr_ptr); | |
} | |
#if STREAMING_LOG | |
if (_M_state == STATE_OPEN) | |
std::cerr << "[generator_streambuf::underflow()] state updated from OPEN to DEFER" << std::endl; | |
#endif | |
_M_state = STATE_DEFER; | |
} | |
else { // errors | |
if (append_ptr - curr_ptr > 0) { | |
_ret = traits_type::to_int_type(*curr_ptr); | |
} | |
_M_state = STATE_ERROR; | |
throw NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE; | |
} | |
} | |
} | |
else { | |
#if STREAMING_LOG | |
std::cerr << "[generator_streambuf::underflow()] noop in underflow()" << std::endl; | |
#endif | |
_ret = traits_type::to_int_type(*curr_ptr); | |
} | |
} | |
#if STREAMING_LOG | |
std::cerr << "[generator_streambuf::underflow()] _ret = " << _ret << std::endl; | |
#endif | |
return _ret; | |
} | |
virtual std::streambuf *setbuf(char *buf_begin, std::streamsize buf_size) { | |
// discard the current non-overflowing buffer, which may be invalid | |
_M_pbuf = buf_begin; | |
_M_pbuf_size = buf_size; | |
if (is_overflowing()) { | |
// fill with the overflowing data | |
std::streamsize overflowing_size = pptr() - pbase(); | |
if (overflowing_size <= _M_pbuf_size) { | |
std::copy_n(&_M_pbuf_overflow[0], overflowing_size, _M_pbuf); | |
if (overflowing_size < _M_pbuf_size) { | |
// non-overflowing mode | |
setp(_M_pbuf, _M_pbuf + _M_pbuf_size); | |
pbump(overflowing_size); | |
} | |
else { | |
// overflowing mode | |
setp(&_M_pbuf_overflow[0], &_M_pbuf_overflow[0] + _M_pbuf_overflow.size()); | |
} | |
} | |
else { | |
// still in overflowing mode | |
std::streamsize new_overflowing_size = overflowing_size - _M_pbuf_size; | |
// fill non-overflowing buffer with the overflow buffer data | |
std::copy_n(&_M_pbuf_overflow[0], _M_pbuf_size, _M_pbuf); | |
// move overflowing buffer data | |
std::copy_n(&_M_pbuf_overflow[0] + overflowing_size, new_overflowing_size, &_M_pbuf_overflow[0]); | |
// update the pointers | |
setp(&_M_pbuf_overflow[0], &_M_pbuf_overflow[0] + _M_pbuf_overflow.size()); | |
pbump(new_overflowing_size); | |
} | |
} | |
else { | |
// update the pointers | |
// non-overflowing mode | |
setp(_M_pbuf, _M_pbuf + _M_pbuf_size); | |
} | |
return this; | |
} | |
virtual int_type overflow(int_type c = traits_type::eof()) { | |
const bool c_is_eof = traits_type::eq_int_type(c, traits_type::eof()); | |
if (pptr() < epptr()) { | |
// room in buffer | |
} | |
else { | |
// no room in the current buffer | |
if (is_overflowing()) { | |
// expand the overflow buffer | |
_M_pbuf_overflow.resize(_M_pbuf_overflow.size() * 2); // resize by 2 times of the current size | |
} | |
else { | |
// switch to overflowing mode | |
} | |
setp(&_M_pbuf_overflow[0], &_M_pbuf_overflow[0] + _M_pbuf_overflow.size()); | |
} | |
if (!c_is_eof) { | |
pbump(1); | |
*pptr() = traits_type::to_char_type(c); | |
} | |
return traits_type::not_eof(c); | |
} | |
virtual std::streamsize xsputn(const char *str, std::streamsize bytes) { | |
#if STREAMING_LOG | |
std::cerr << "[generator_streambuf::xsputn] bytes = " << bytes << std::endl; | |
#if STREAMING_DUMP | |
std::cerr << "[generator_streambuf::xsputn] output = \n\""; | |
std::cerr.write(str, bytes); | |
std::cerr << "\" end of output" << std::endl; | |
#endif | |
#endif | |
if (is_overflowing()) { | |
#if STREAMING_LOG | |
std::cerr << "[generator_streambuf::xsputn] is_overflowing = true" << std::endl; | |
#endif | |
if (pptr() + bytes <= epptr()) { | |
// within the overflow buffer | |
#if STREAMING_LOG | |
std::cerr << "[generator_streambuf::xsputn] within the overflow buffer" << std::endl; | |
#endif | |
std::copy_n(str, bytes, pptr()); | |
pbump(bytes); | |
} | |
else { | |
// overflows the current overflow buffer | |
#if STREAMING_LOG | |
std::cerr << "[generator_streambuf::xsputn] overflows the current overflow buffer; epptr() - pptr() = " << (epptr() - pptr()) | |
<< " _M_pbuf_overflow.size() = " << _M_pbuf_overflow.size() | |
<< " pptr() - &_M_pbuf_overflow[0] = " << (pptr() - &_M_pbuf_overflow[0]) << std::endl; | |
#endif | |
std::streamsize current_overflow_size = pptr() - pbase(); | |
std::streamsize required_overflow_size = current_overflow_size + bytes; | |
std::streamsize targeted_overflow_size = _M_pbuf_overflow.size(); | |
while (targeted_overflow_size < required_overflow_size) { | |
targeted_overflow_size *= 2; | |
} | |
// expand the overflow buffer | |
#if STREAMING_LOG | |
std::cerr << "[generator_streambuf::xsputn] expands the overflow buffer; targeted_overflow_size = " << targeted_overflow_size << std::endl; | |
#endif | |
_M_pbuf_overflow.resize(targeted_overflow_size); | |
// update pointers | |
setp(&_M_pbuf_overflow[0], &_M_pbuf_overflow[0] + _M_pbuf_overflow.size()); | |
pbump(current_overflow_size); | |
// write to the expanded buffer | |
std::copy_n(str, bytes, pptr()); | |
} | |
} | |
else { | |
// not in the overflowing mode | |
#if STREAMING_LOG | |
std::cerr << "[generator_streambuf::xsputn] is_overflowing = false" << std::endl; | |
#endif | |
if (pptr() + bytes <= epptr()) { | |
// within the _M_pbuf buffer | |
#if STREAMING_LOG | |
std::cerr << "[generator_streambuf::xsputn] pptr() + bytes <= epptr(); within the _M_pbuf buffer" << std::endl; | |
#endif | |
std::copy_n(str, bytes, pptr()); | |
pbump(bytes); | |
if (pptr() >= epptr()) { | |
// transition to overflowing mode | |
#if STREAMING_LOG | |
std::cerr << "[generator_streambuf::xsputn] pptr() >= epptr() after writing; transition to overflowing mode" << std::endl; | |
#endif | |
setp(&_M_pbuf_overflow[0], &_M_pbuf_overflow[0] + _M_pbuf_overflow.size()); | |
} | |
} | |
else { | |
// split into 2 buffers | |
std::streamsize remaining_buffer_size = epptr() - pptr(); | |
std::streamsize overflowing_size = bytes - remaining_buffer_size; | |
#if STREAMING_LOG | |
std::cerr << "[generator_streambuf::xsputn] split into 2 buffers; " | |
<< "remaining_buffer_size = " << remaining_buffer_size | |
<< " overflowing_size = " << overflowing_size << std::endl; | |
#endif | |
std::copy_n(str, remaining_buffer_size, pptr()); | |
pbump(remaining_buffer_size); | |
// put the remaining data into the overflow buffer | |
if (overflowing_size <= _M_pbuf_overflow.size()) { | |
// within the current overflow buffer | |
} | |
else { | |
// expand the current overflow buffer | |
std::streamsize required_overflow_size = overflowing_size; | |
std::streamsize targeted_overflow_size = _M_pbuf_overflow.size(); | |
while (targeted_overflow_size < required_overflow_size) { | |
targeted_overflow_size *= 2; | |
} | |
#if STREAMING_LOG | |
std::cerr << "[generator_streambuf::xsputn] expands the overflow buffer; targeted_overflow_size = " << targeted_overflow_size << std::endl; | |
#endif | |
_M_pbuf_overflow.resize(targeted_overflow_size); | |
} | |
setp(&_M_pbuf_overflow[0], &_M_pbuf_overflow[0] + _M_pbuf_overflow.size()); | |
std::copy_n(str + remaining_buffer_size, overflowing_size, pptr()); | |
pbump(overflowing_size); | |
} | |
} | |
return bytes; | |
} | |
virtual int sync() { | |
if (_M_mode & std::ios::out) { | |
#if STREAMING_LOG | |
std::cerr << "[generator_streambuf::sync] out_avail = " << out_avail() << std::endl; | |
#endif | |
// invalidate the contents of _M_pbuf | |
if (!is_overflowing()) { | |
// not overflowing | |
// transition to the overflowing mode | |
setp(&_M_pbuf_overflow[0], &_M_pbuf_overflow[0] + _M_pbuf_overflow.size()); | |
} | |
// invalidate the _M_pbuf (not owning) | |
_M_pbuf = nullptr; | |
_M_pbuf_size = 0; | |
return 0; | |
} | |
else { | |
#if STREAMING_LOG | |
std::cerr << "[generator_streambuf::sync] in; noop" << std::endl; | |
#endif | |
return 0; | |
} | |
} | |
}; | |
class generator_stream: public std::iostream, public std::enable_shared_from_this<generator_stream> { | |
public: | |
generator_stream(generator_cb generator, std::streamsize buf_size = 16384) // nghttp2 responds in chunks of 16384 bytes at most | |
: _M_generator_buf(generator, buf_size), std::iostream(&_M_generator_buf) { | |
init(&_M_generator_buf); | |
} | |
virtual ~generator_stream() { | |
#if STREAMING_LOG | |
std::cerr << "[generator_stream::~generator_stream()] destructing" << std::endl; | |
#endif | |
} | |
ssize_t check_status(uint32_t *data_flags) { | |
auto bytes = gcount(); | |
#if STREAMING_LOG | |
std::cerr << "[generator_stream:check_status()] gcount() = " << bytes << std::endl; | |
#endif | |
if (bytes == 0) { | |
if (eof()) { | |
#if STREAMING_LOG | |
std::cerr << "[generator_stream:check_status()] eof() = true" << std::endl; | |
#endif | |
if (fail()) { | |
#if STREAMING_LOG | |
std::cerr << "[generator_stream:check_status()] fail() = true; NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE" << std::endl; | |
#endif | |
bytes = (ssize_t)NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE; | |
} | |
else { | |
#if STREAMING_LOG | |
std::cerr << "[generator_stream:check_status()] fail() = false; NGHTTP2_DATA_FLAG_EOF" << std::endl; | |
#endif | |
*data_flags |= NGHTTP2_DATA_FLAG_EOF; | |
} | |
} | |
else if (good()) { | |
#if STREAMING_LOG | |
std::cerr << "[generator_stream:check_status()] good() = true; NGHTTP2_ERR_DEFERRED" << std::endl; | |
#endif | |
// bytes == 0 and good(), i.e., DEFERRED | |
bytes = (ssize_t)NGHTTP2_ERR_DEFERRED; | |
} | |
else { | |
#if STREAMING_LOG | |
std::cerr << "[generator_stream:check_status()] good() = false; NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE" << std::endl; | |
#endif | |
bytes = (ssize_t)NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE; | |
} | |
} | |
else { | |
// bytes > 0 | |
#if STREAMING_LOG | |
std::cerr << "[generator_stream:check_status()] bytes = " << bytes << " > 0 ; " << std::endl; | |
#endif | |
if (eof()) { | |
#if STREAMING_LOG | |
std::cerr << "[generator_stream:check_status()] bytes > 0 & eof() = true; NGHTTP2_DATA_FLAG_EOF" << std::endl; | |
#endif | |
*data_flags |= NGHTTP2_DATA_FLAG_EOF; | |
} | |
} | |
return bytes; | |
} | |
bool is_overflowing() { | |
return _M_generator_buf.is_overflowing(); | |
} | |
generator_streambuf *setbuf(uint8_t *buf, std::size_t len) { | |
return (generator_streambuf *)(_M_generator_buf.pubsetbuf((char *)buf, (std::streamsize)len)); | |
} | |
std::streamsize peek_and_filter(bool &incomplete, bool &deferred) { | |
try { | |
incomplete = false; | |
deferred = false; | |
auto in_avail = _M_generator_buf.in_avail(); | |
if (in_avail < 0) { | |
// EOF | |
do_close(); | |
return -1; | |
} | |
char *consumable_begin; | |
char *consumable_end; | |
std::string &buf = _M_generator_buf.peek(consumable_begin, consumable_end); | |
bool found = do_peek(buf, consumable_begin, consumable_end); | |
std::streamsize consumable_size; | |
if (found) { | |
consumable_size = consumable_end - consumable_begin; | |
#if STREAMING_LOG | |
std::cerr << "[generator_stream::peek_and_filter()] comsumable data found; consumable size = " << consumable_size << std::endl; | |
#endif | |
auto filtered = do_filter(consumable_begin, consumable_size); | |
} | |
else { | |
generator_streambuf::generator_state state = _M_generator_buf.state(); | |
if (state == generator_streambuf::STATE_EOF) { | |
// EOF | |
incomplete = true; | |
consumable_size = consumable_end - consumable_begin; | |
#if STREAMING_LOG | |
std::cerr << "[generator_stream::peek_and_filter()] comsumable data not found; EOF; cosumable size = " << consumable_size << std::endl; | |
#endif | |
auto filtered = do_filter(consumable_begin, consumable_size); | |
do_close(); | |
} | |
else if (state == generator_streambuf::STATE_DEFER) { | |
// DEFER | |
#if STREAMING_LOG | |
std::cerr << "[generator_stream::peek_and_filter()] comsumable data not found; DEFER" << std::endl; | |
#endif | |
incomplete = true; | |
deferred = true; | |
consumable_size = 0; | |
} | |
else { | |
consumable_size = consumable_end - consumable_begin; | |
#if STREAMING_LOG | |
std::cerr << "[generator_stream::peek_and_filter()] comsumable data not found; buf.size() = " << buf.size() << " consumable_size = " << consumable_size << std::endl; | |
#if STREAMING_DUMP | |
std::cerr << "[generator_stream::peek_and_filter()] consumable data = \""; | |
std::cerr.write(consumable_begin, consumable_size); | |
std::cerr << "\"" << std::endl; | |
#endif | |
#endif | |
if (buf.size() > consumable_size) { | |
// not EOF, buffer not full | |
if (state == generator_streambuf::STATE_DEFER) { | |
#if STREAMING_LOG | |
std::cerr << "[generator_stream::peek_and_filter()] state = DEFER; comsumable data not found; consume 0 bytes" << std::endl; | |
#endif | |
incomplete = true; | |
deferred = true; | |
consumable_size = 0; | |
} | |
else { | |
#if STREAMING_LOG | |
std::cerr << "[generator_stream::peek_and_filter()] comsumable data not found; consume 0 bytes" << std::endl; | |
#endif | |
incomplete = true; | |
consumable_size = 0; | |
} | |
} | |
else { | |
// not EOF, buffer full | |
incomplete = true; | |
// give up recognizing a newline | |
consumable_size = consumable_end - consumable_begin; | |
auto filtered = do_filter(consumable_begin, consumable_size); | |
} | |
} | |
} | |
#if STREAMING_LOG | |
std::cerr << "[generator_stream::peek_and_filter()] returning consumable_size = " << consumable_size << std::endl; | |
#if STREAMING_DUMP | |
std::cerr << "[generator_stream::peek_and_filter()] consuming \""; | |
std::cerr.write(consumable_begin, consumable_size); | |
std::cerr << "\"" << std::endl; | |
#endif | |
#endif | |
return consumable_size; | |
} | |
catch (...) { | |
#if STREAMING_LOG | |
std::cerr << "[generator_stream::peek_and_filter()] exception thrown" << std::endl; | |
#endif | |
setstate(std::ios_base::failbit); | |
return 0; | |
} | |
} | |
// override do_peek to detect transformable chunks in buffer | |
virtual bool do_peek(std::string &buf, char *&begin, char *&end) { | |
// begin must not be changed | |
return true; | |
} | |
// override do_filter to implement transformation | |
virtual std::streamsize do_filter(char *in_beg, std::streamsize in_size) { | |
write(in_beg, in_size); // identity filter | |
return in_size; | |
} | |
// override do_close to flush and close intermediate buffers | |
virtual void do_close() {} | |
std::streamsize filter(uint8_t *buf, std::size_t len, uint32_t *data_flags) { | |
#if STREAMING_LOG | |
std::cerr << "[generator_stream:filter()] len = " << len << std::endl; | |
#endif | |
if (fail()) { | |
return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE; | |
} | |
setbuf(buf, len); | |
if (is_overflowing()) { | |
flush(); | |
return len; | |
} | |
auto out_avail = _M_generator_buf.out_avail(); | |
if (eof()) { | |
*data_flags |= NGHTTP2_DATA_FLAG_EOF; | |
flush(); | |
if (out_avail > 0) { | |
return out_avail; | |
} | |
else { | |
return 0; | |
} | |
} | |
// not overflowing, not eof | |
std::streamsize consumed = 0; | |
bool is_incomplete = false; | |
bool is_deferred = false; | |
generator_streambuf::generator_state state = _M_generator_buf.state(); | |
if (state == generator_streambuf::STATE_DEFER) { | |
#if STREAMING_LOG | |
std::cerr << "[generator_stream::filter()] buf state = DEFER; resetting state by underflow and showmanyc" << std::endl; | |
#endif | |
try { | |
_M_generator_buf.underflow(); | |
_M_generator_buf.showmanyc(); | |
} | |
catch (...) { | |
setstate(std::ios_base::failbit); | |
return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE; | |
} | |
} | |
while (!is_overflowing()) { | |
consumed = peek_and_filter(is_incomplete, is_deferred); | |
if (consumed == -1) { | |
// EOF | |
#if STREAMING_LOG | |
std::cerr << "[generator_stream::filter()] peek_and_filter returned EOF = " << consumed << std::endl; | |
#endif | |
break; | |
} | |
else if (consumed == 0) { | |
// INCOMPLETE or DEFER or ERROR | |
#if STREAMING_LOG | |
std::cerr << "[generator_stream::filter()] peek_and_filter returned 0" << std::endl; | |
#endif | |
if (is_incomplete && !is_deferred) { | |
try { | |
_M_generator_buf.overflow(); | |
_M_generator_buf.showmanyc(); | |
} | |
catch (...) { | |
setstate(std::ios_base::failbit); | |
break; | |
} | |
continue; | |
} | |
break; | |
} | |
_M_generator_buf.pubseekoff(consumed, std::ios_base::cur, std::ios_base::in); | |
} | |
if (fail()) { | |
return NGHTTP2_ERR_TEMPORAL_CALLBACK_FAILURE; | |
} | |
#if STREAMING_LOG | |
if (consumed == 0) { | |
std::cerr << "[generator_stream::filter()] consumed = 0" << std::endl; | |
} | |
#endif | |
out_avail = _M_generator_buf.out_avail(); | |
if (out_avail > 0) { | |
flush(); | |
return out_avail; | |
} | |
else { | |
// out_avail == 0 | |
if (consumed < 0) { | |
*data_flags |= NGHTTP2_DATA_FLAG_EOF; | |
flush(); | |
return 0; | |
} | |
else if (consumed == 0) { | |
flush(); | |
#if STREAMING_LOG | |
std::cerr << "[generator_stream::filter()] returning NGHTTP2_ERR_DEFERRED" << std::endl; | |
#endif | |
return NGHTTP2_ERR_DEFERRED; | |
} | |
else { | |
// consumed > 0 but out_avail == 0 | |
flush(); | |
#if STREAMING_LOG | |
std::cerr << "[generator_stream::filter()] returning 0" << std::endl; | |
#endif | |
return 0; | |
} | |
} | |
} | |
protected: | |
generator_streambuf _M_generator_buf; | |
}; | |
#endif // GENERATOR_STREAM | |
#endif // STREAMING_SUPPORT | |
#if STREAMING_SUPPORT | |
#if BOOST_FILTER | |
// Adaptor for a chain of one or more boost::iostreams::filtering_streambuf<output> filters | |
class boost_filtering_streambuf_adaptor: public generator_stream { | |
public: | |
template <class... Filter> | |
boost_filtering_streambuf_adaptor(generator_cb cb, Filter&&... filters) | |
: generator_stream(cb), | |
_M_filtering_streambuf(), | |
_M_sink(*this) { | |
push(filters...); | |
} | |
template <class... Filter> | |
boost_filtering_streambuf_adaptor(generator_cb cb, std::streamsize buf_size, Filter&&... filters) | |
: generator_stream(cb, buf_size), | |
_M_filtering_streambuf(), | |
_M_sink(*this) { | |
push(filters...); | |
} | |
class generator_stream_sink: public boost::iostreams::sink { | |
public: | |
generator_stream_sink(boost_filtering_streambuf_adaptor &adaptor): adaptor(adaptor) {} | |
~generator_stream_sink() { | |
#if STREAMING_LOG | |
std::cerr << "[generator_stream_sink] destructing" << std::endl; | |
#endif | |
} | |
std::streamsize write(const char* s, std::streamsize n) | |
{ | |
#if STREAMING_LOG | |
std::cerr << "[generator_stream_sink] write n = " << n << std::endl; | |
#if STREAMING_DUMP | |
std::cerr << "[generator_stream_sink] output = \n\""; | |
std::cerr.write(s, n); | |
std::cerr << "\" end of output" << std::endl; | |
#endif | |
#endif | |
adaptor.write(s, n); | |
return n; | |
} | |
protected: | |
boost_filtering_streambuf_adaptor &adaptor; | |
}; | |
generator_stream_sink &sink() { | |
#if STREAMING_LOG | |
std::cerr << "[generator_stream:sink()]" << std::endl; | |
#endif | |
return _M_sink; | |
} | |
boost::iostreams::filtering_streambuf<boost::iostreams::output> &get_filter() { | |
#if STREAMING_LOG | |
std::cerr << "[generator_stream:get_filter()]" << std::endl; | |
#endif | |
return _M_filtering_streambuf; | |
} | |
bool do_peek(std::string &buf, char *&begin, char *&end) { | |
return true; | |
} | |
std::streamsize do_filter(char *in_beg, std::streamsize in_size) { | |
#if STREAMING_LOG | |
std::cerr << "[generator_stream::do_filter()] forwarding to filtering_streambuf->sputn() in_size = " << in_size << std::endl; | |
#endif | |
return _M_filtering_streambuf.sputn(in_beg, in_size); | |
} | |
void do_close() { | |
boost::iostreams::close(_M_filtering_streambuf); | |
} | |
protected: | |
void push() { | |
get_filter().push(sink(), _M_generator_buf.buf_size()); | |
} | |
template <class Head, class... Tail> | |
void push(Head &&head, Tail&&... tail) { | |
get_filter().push(head, _M_generator_buf.buf_size()); | |
push(std::forward<Tail>(tail)...); | |
} | |
boost::iostreams::filtering_streambuf<boost::iostreams::output> _M_filtering_streambuf; | |
generator_stream_sink _M_sink; | |
}; | |
#endif // BOOST_FILTER | |
#endif // STREAMING_SUPPORT | |
} // namespace nicexprs | |
#endif // NICEXPRS_H |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
GitHub issue on nicexprs.h