-
-
Save ra1u/ea36d9c1e51fc7538974fe8a29ed991f to your computer and use it in GitHub Desktop.
asio beast pipelined server
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <algorithm> | |
#include <boost/asio/bind_executor.hpp> | |
#include <boost/asio/ip/tcp.hpp> | |
#include <boost/asio/signal_set.hpp> | |
#include <boost/asio/strand.hpp> | |
#include <boost/beast/core.hpp> | |
#include <boost/beast/http.hpp> | |
#include <boost/beast/version.hpp> | |
#include <boost/config.hpp> | |
#include <cstdlib> | |
#include <functional> | |
#include <iostream> | |
#include <memory> | |
#include <string> | |
//#include <thread> | |
#include <vector> | |
using tcp = boost::asio::ip::tcp; // from <boost/asio/ip/tcp.hpp> | |
namespace http = boost::beast::http; // from <boost/beast/http.hpp> | |
namespace { | |
template <typename T> std::shared_ptr<T> move_to_shared(T &&v) | |
{ | |
return std::make_shared<T>(std::forward<T>(v)); | |
} | |
// This function produces an HTTP response for the given | |
// request. The type of the response object depends on the | |
// contents of the request, so the interface requires the | |
// caller to pass a generic lambda for receiving the response. | |
template <class Body, class Allocator, class Send> | |
static void handle_request(http::request<Body, http::basic_fields<Allocator>> &&req, | |
Send &&send) | |
{ | |
// Returns a bad request response | |
auto const bad_request = [&req](boost::beast::string_view why) { | |
http::response<http::string_body> res{http::status::bad_request, | |
req.version()}; | |
res.set(http::field::server, BOOST_BEAST_VERSION_STRING); | |
res.set(http::field::content_type, "text/html"); | |
res.keep_alive(req.keep_alive()); | |
res.body() = why.to_string(); | |
res.prepare_payload(); | |
return res; | |
}; | |
// Cache the size since we need it after the move | |
if (req.method() == http::verb::put) { | |
// Respond echo to PUT request | |
http::response<http::string_body> res{http::status::ok, req.version()}; | |
res.body() = std::move(req.body()); | |
res.prepare_payload(); | |
return send(std::move(res)); | |
} | |
return send(bad_request("respond only put")); | |
} | |
//------------------------------------------------------------------------------ | |
// Report a failure | |
void fail(boost::system::error_code ec, char const *what) | |
{ | |
std::cerr << what << ": " << ec.message() << "\n"; | |
} | |
// Handles an HTTP server connection | |
class session : public std::enable_shared_from_this<session> { | |
tcp::socket socket_; | |
// boost::asio::strand<boost::asio::io_context::executor_type> strand_; | |
boost::beast::flat_buffer buffer_; | |
http::request<http::string_body> req_; | |
std::shared_ptr<void> res_; | |
public: | |
// Take ownership of the socket | |
explicit session(tcp::socket socket) | |
: socket_(std::move(socket)) | |
{ | |
//socket_.set_option(tcp::no_delay(true)); | |
} | |
// Start the asynchronous operation | |
void run() | |
{ | |
do_read(); | |
} | |
private: | |
void do_read() | |
{ | |
// Make the request empty before reading, | |
// otherwise the operation behavior is undefined. | |
req_ = {}; | |
// Read a request | |
auto self = shared_from_this(); | |
http::async_read(socket_, buffer_, req_, [this, self](boost::system::error_code ec, | |
std::size_t size) { | |
on_read(ec, size); | |
}); | |
} | |
void on_read(boost::system::error_code ec, std::size_t bytes_transferred) | |
{ | |
boost::ignore_unused(bytes_transferred); | |
// This means they closed the connection | |
if (ec == http::error::end_of_stream) | |
return do_close(); | |
if (ec) | |
return fail(ec, "read"); | |
// Send the response | |
auto self = shared_from_this(); | |
handle_request(std::move(req_), [self, this](auto &&msg) { | |
auto sp = move_to_shared(std::move(msg)); | |
// Write the response | |
auto &s = *sp; | |
http::async_write( | |
socket_, s, [sp, self, this](boost::system::error_code ec, | |
std::size_t bytes_transferred) { | |
on_write(ec, bytes_transferred); | |
}); | |
}); | |
} | |
void on_write(boost::system::error_code ec, std::size_t bytes_transferred) | |
{ | |
boost::ignore_unused(bytes_transferred); | |
if (ec) | |
return fail(ec, "write"); | |
else { | |
// We're done with the response so delete it | |
res_ = nullptr; | |
// Read another request | |
do_read(); | |
} | |
} | |
void do_close() | |
{ | |
// Send a TCP shutdown | |
boost::system::error_code ec; | |
socket_.shutdown(tcp::socket::shutdown_send, ec); | |
// At this point the connection is closed gracefully | |
} | |
}; | |
//------------------------------------------------------------------------------ | |
// Accepts incoming connections and launches the sessions | |
class listener : public std::enable_shared_from_this<listener> { | |
tcp::acceptor acceptor_; | |
tcp::socket socket_; | |
public: | |
listener(boost::asio::io_context &ioc, tcp::endpoint endpoint) | |
: acceptor_(ioc), socket_(ioc) | |
{ | |
boost::system::error_code ec; | |
// Open the acceptor | |
acceptor_.open(endpoint.protocol(), ec); | |
if (ec) { | |
fail(ec, "open"); | |
return; | |
} | |
// Allow address reuse | |
acceptor_.set_option(boost::asio::socket_base::reuse_address(true), ec); | |
if (ec) { | |
fail(ec, "set_option"); | |
return; | |
} | |
// Bind to the server address | |
acceptor_.bind(endpoint, ec); | |
if (ec) { | |
fail(ec, "bind"); | |
return; | |
} | |
// Start listening for connections | |
acceptor_.listen(boost::asio::socket_base::max_listen_connections, ec); | |
if (ec) { | |
fail(ec, "listen"); | |
return; | |
} | |
} | |
// Start accepting incoming connections | |
void run() | |
{ | |
if (!acceptor_.is_open()) | |
return; | |
do_accept(); | |
} | |
private: | |
void do_accept() | |
{ | |
acceptor_.async_accept(socket_, std::bind(&listener::on_accept, | |
shared_from_this(), | |
std::placeholders::_1)); | |
} | |
void on_accept(boost::system::error_code ec) | |
{ | |
std::cout << "accept\n"; | |
if (ec) { | |
fail(ec, "accept"); | |
} | |
else { | |
// Create the session and run it | |
std::make_shared<session>(std::move(socket_))->run(); | |
} | |
// Accept another connection | |
do_accept(); | |
} | |
}; | |
} // namespace | |
//------------------------------------------------------------------------------ | |
int main(int argc, char *argv[]) | |
{ | |
auto const address = boost::asio::ip::make_address("127.0.0.1"); | |
auto const port = static_cast<unsigned short>(8080); | |
// The io_context is required for all I/O | |
boost::asio::io_context ioc{1}; //{threads}; | |
// Create and launch a listening port | |
std::make_shared<listener>(ioc, tcp::endpoint{address, port})->run(); | |
// Construct a signal set registered for process termination. | |
boost::asio::signal_set signals(ioc, SIGINT, SIGTERM); | |
signals.async_wait( | |
[&ioc](const boost::system::error_code &error, int signal_number) { | |
if (!error) { | |
// A signal occurred. | |
ioc.stop(); | |
} | |
}); | |
ioc.run(); | |
return EXIT_SUCCESS; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment