Skip to content

Instantly share code, notes, and snippets.

Created March 15, 2018 18:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anonymous/1160c11f8ed9c29b9184325191a3a63b to your computer and use it in GitHub Desktop.
Save anonymous/1160c11f8ed9c29b9184325191a3a63b to your computer and use it in GitHub Desktop.
attempt to implement a read timeout on a synchronous boost::asio read, in a multithreaded server
#include <iostream>
#include <boost/asio.hpp>
#include <thread>
#include <boost/bind.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/optional.hpp>
#include <time.h>
using namespace boost::asio;
using namespace boost::asio::ip;
class SimpleServer {
protected:
io_service& ioservice_;
tcp::acceptor acceptor_;
public:
SimpleServer(io_service& ioservice, const tcp::endpoint& endpoint)
: ioservice_(ioservice),
acceptor_(ioservice_)
{
acceptor_.open(endpoint.protocol());
acceptor_.set_option(tcp::acceptor::reuse_address(true));
acceptor_.bind(endpoint);
acceptor_.listen();
}
virtual ~SimpleServer() {}
virtual void start() = 0;
};
class BlockingConnection : public boost::enable_shared_from_this<BlockingConnection> {
boost::asio::strand strand_;
tcp::socket socket_;
public:
BlockingConnection(io_service& ioservice)
: strand_(ioservice), socket_(ioservice)
{}
tcp::socket& socket() {
return socket_;
}
boost::asio::strand& strand() {
return strand_;
}
void start() {
std::vector<char> b(1024);
deadline_timer timer(socket_.get_io_service());
timer.expires_from_now(boost::posix_time::milliseconds(1000));
boost::system::error_code ec;
size_t bytesRead = 0;
bool timedOut = false;
bool readComplete = false;
timer.async_wait(strand_.wrap(boost::bind(&handleReadTimeout, &timedOut, &socket_, _1)));
async_read(socket_, buffer(b),
strand_.wrap(boost::bind(handleReadComplete, &ec, &bytesRead, &readComplete, &timer, _1, _2)));
while (socket_.get_io_service().run_one()) {
if (timedOut && readComplete) {
break;
}
}
std::cout << "BlockingConnection terminating " << std::endl;
}
private:
static void handleReadTimeout(bool* ptimedOut, tcp::socket* sock, boost::system::error_code ec) {
std::cout << "timer returned : " << ec.message() << std::endl;
if (ec.value() == boost::asio::error::operation_aborted)
return;
*ptimedOut = true;
sock->lowest_layer().cancel();
}
static void handleReadComplete(boost::system::error_code* pec, size_t* pcount, bool* pcomplete, boost::asio::deadline_timer* timer, boost::system::error_code ec, size_t count) {
std::cout << "Read returned : " << ec.message() << std::endl;
*pec = ec;
*pcount = count;
*pcomplete = true;
timer->cancel();
}
};
class BlockingServer : public SimpleServer, public boost::enable_shared_from_this<BlockingServer> {
public:
BlockingServer(io_service& ioservice, const tcp::endpoint& endpoint)
: SimpleServer(ioservice, endpoint)
{}
void start() {
boost::shared_ptr<BlockingConnection> connection(new BlockingConnection(acceptor_.get_io_service()));
acceptor_.async_accept(connection->socket(), boost::bind(&BlockingServer::handleAccept, shared_from_this(), connection));
}
private:
void handleAccept(boost::shared_ptr<BlockingConnection> connection) {
connection->start();
start();
}
};
tcp::endpoint getEndpoint(const char* portString) {
io_service ioservice;
tcp::resolver resolver(ioservice);
tcp::resolver::query query("localhost", portString);
return *resolver.resolve(query);
}
void runServer(int threadCount) {
io_service ioservice;
boost::shared_ptr<BlockingServer> blockingServer(new BlockingServer(ioservice, getEndpoint("5555")));
blockingServer->start();
std::vector<std::thread> threads;
for (int i = 0; i < threadCount; i++) {
threads.push_back(std::thread(boost::bind(&io_service::run, &ioservice)));
}
for (int i = 0; i < threadCount; i++) {
threads[i].join();
}
}
int main() {
io_service ioservice;
std::thread serverThread(boost::bind(runServer, 2));
sleep(1);
tcp::socket socket(ioservice);
socket.connect(getEndpoint("5555"));
/* And never send anything. */
serverThread.join();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment