Created
March 15, 2018 18:49
-
-
Save anonymous/1160c11f8ed9c29b9184325191a3a63b to your computer and use it in GitHub Desktop.
attempt to implement a read timeout on a synchronous boost::asio read, in a multithreaded server
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <iostream> | |
#include <boost/asio.hpp> | |
#include <thread> | |
#include <boost/bind.hpp> | |
#include <boost/date_time/posix_time/posix_time.hpp> | |
#include <boost/enable_shared_from_this.hpp> | |
#include <boost/optional.hpp> | |
#include <time.h> | |
using namespace boost::asio; | |
using namespace boost::asio::ip; | |
class SimpleServer { | |
protected: | |
io_service& ioservice_; | |
tcp::acceptor acceptor_; | |
public: | |
SimpleServer(io_service& ioservice, const tcp::endpoint& endpoint) | |
: ioservice_(ioservice), | |
acceptor_(ioservice_) | |
{ | |
acceptor_.open(endpoint.protocol()); | |
acceptor_.set_option(tcp::acceptor::reuse_address(true)); | |
acceptor_.bind(endpoint); | |
acceptor_.listen(); | |
} | |
virtual ~SimpleServer() {} | |
virtual void start() = 0; | |
}; | |
class BlockingConnection : public boost::enable_shared_from_this<BlockingConnection> { | |
boost::asio::strand strand_; | |
tcp::socket socket_; | |
public: | |
BlockingConnection(io_service& ioservice) | |
: strand_(ioservice), socket_(ioservice) | |
{} | |
tcp::socket& socket() { | |
return socket_; | |
} | |
boost::asio::strand& strand() { | |
return strand_; | |
} | |
void start() { | |
std::vector<char> b(1024); | |
deadline_timer timer(socket_.get_io_service()); | |
timer.expires_from_now(boost::posix_time::milliseconds(1000)); | |
boost::system::error_code ec; | |
size_t bytesRead = 0; | |
bool timedOut = false; | |
bool readComplete = false; | |
timer.async_wait(strand_.wrap(boost::bind(&handleReadTimeout, &timedOut, &socket_, _1))); | |
async_read(socket_, buffer(b), | |
strand_.wrap(boost::bind(handleReadComplete, &ec, &bytesRead, &readComplete, &timer, _1, _2))); | |
while (socket_.get_io_service().run_one()) { | |
if (timedOut && readComplete) { | |
break; | |
} | |
} | |
std::cout << "BlockingConnection terminating " << std::endl; | |
} | |
private: | |
static void handleReadTimeout(bool* ptimedOut, tcp::socket* sock, boost::system::error_code ec) { | |
std::cout << "timer returned : " << ec.message() << std::endl; | |
if (ec.value() == boost::asio::error::operation_aborted) | |
return; | |
*ptimedOut = true; | |
sock->lowest_layer().cancel(); | |
} | |
static void handleReadComplete(boost::system::error_code* pec, size_t* pcount, bool* pcomplete, boost::asio::deadline_timer* timer, boost::system::error_code ec, size_t count) { | |
std::cout << "Read returned : " << ec.message() << std::endl; | |
*pec = ec; | |
*pcount = count; | |
*pcomplete = true; | |
timer->cancel(); | |
} | |
}; | |
class BlockingServer : public SimpleServer, public boost::enable_shared_from_this<BlockingServer> { | |
public: | |
BlockingServer(io_service& ioservice, const tcp::endpoint& endpoint) | |
: SimpleServer(ioservice, endpoint) | |
{} | |
void start() { | |
boost::shared_ptr<BlockingConnection> connection(new BlockingConnection(acceptor_.get_io_service())); | |
acceptor_.async_accept(connection->socket(), boost::bind(&BlockingServer::handleAccept, shared_from_this(), connection)); | |
} | |
private: | |
void handleAccept(boost::shared_ptr<BlockingConnection> connection) { | |
connection->start(); | |
start(); | |
} | |
}; | |
tcp::endpoint getEndpoint(const char* portString) { | |
io_service ioservice; | |
tcp::resolver resolver(ioservice); | |
tcp::resolver::query query("localhost", portString); | |
return *resolver.resolve(query); | |
} | |
void runServer(int threadCount) { | |
io_service ioservice; | |
boost::shared_ptr<BlockingServer> blockingServer(new BlockingServer(ioservice, getEndpoint("5555"))); | |
blockingServer->start(); | |
std::vector<std::thread> threads; | |
for (int i = 0; i < threadCount; i++) { | |
threads.push_back(std::thread(boost::bind(&io_service::run, &ioservice))); | |
} | |
for (int i = 0; i < threadCount; i++) { | |
threads[i].join(); | |
} | |
} | |
int main() { | |
io_service ioservice; | |
std::thread serverThread(boost::bind(runServer, 2)); | |
sleep(1); | |
tcp::socket socket(ioservice); | |
socket.connect(getEndpoint("5555")); | |
/* And never send anything. */ | |
serverThread.join(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment