Skip to content

Instantly share code, notes, and snippets.

@stdk
Created November 15, 2013 23:32
Show Gist options
  • Save stdk/7493541 to your computer and use it in GitHub Desktop.
Save stdk/7493541 to your computer and use it in GitHub Desktop.
asio with coroutines
#include <boost/coroutine/all.hpp>
#include <boost/asio.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/noncopyable.hpp>
#include <iostream>
#include <streambuf>
#include <string>
#pragma comment(lib, "libboost_coroutine-vc110-mt-1_54.lib")
typedef boost::coroutines::coroutine<void (boost::system::error_code, size_t)> coroutine_t;
class inbuf : public std::streambuf, private boost::noncopyable {
char buffer[64];
boost::asio::ip::tcp::socket &socket;
coroutine_t &coroutine;
coroutine_t::caller_type &caller;
int fetch() {
std::cout << "fetch" << std::endl;
socket.async_receive(boost::asio::buffer(buffer),
[this](boost::system::error_code const &error, size_t bytes_transferred) {
std::cerr << "+ " << bytes_transferred << " bytes" << std::endl;
coroutine(error, bytes_transferred);
});
caller();
boost::system::error_code error;
size_t bytes_transferred = 0;
boost::tie(error, bytes_transferred) = caller.get();
if(error) {
std::cout << error.message() << std::endl;
setg(0,0,0);
return -1;
}
std::cout << "bytes_transferred: " << bytes_transferred << std::endl;
setg(buffer, buffer, buffer + bytes_transferred);
return bytes_transferred;
}
public:
virtual int underflow() {
if(gptr() < egptr()) {
return traits_type::to_int_type(*gptr());
}
if(fetch() < 0) {
return traits_type::eof();
} else {
return traits_type::to_int_type(*gptr());
}
}
inbuf(boost::asio::ip::tcp::socket &_socket, coroutine_t &_coroutine, coroutine_t::caller_type &_caller)
:socket(_socket), coroutine(_coroutine), caller(_caller)
{
setg(0,0,0);
}
};
class session : private boost::noncopyable {
boost::asio::ip::tcp::socket socket;
coroutine_t coroutine;
void handle_error(boost::system::error_code const &error) {
std::cout << error.message() << std::endl;
delete this;
}
public:
auto get_socket() -> std::add_lvalue_reference<decltype(socket)>::type {
return socket;
}
void start() {
std::cout << "session: " << socket.remote_endpoint() << std::endl;
coroutine = coroutine_t([this](coroutine_t::caller_type &caller) {
std::cout << "coroutine" << std::endl;
inbuf buffer(socket, coroutine, caller);
std::istream stream(&buffer);
std::string msg = "";
do {
std::getline(stream, msg);
std::cout << msg << std::endl;
} while(msg != "exit\r" && msg != "");
socket.get_io_service().post([this]() {
delete this;
});
});
}
session(boost::asio::io_service &io_svc):socket(io_svc) {
std::cout << "session" << std::endl;
}
~session() {
std::cout << "~session" << std::endl;
}
};
class server : public boost::enable_shared_from_this<server> {
boost::asio::ip::tcp::acceptor acceptor;
server(boost::asio::io_service &io_svc, short port) :
acceptor(
io_svc,
boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port) ) {
}
public:
typedef boost::shared_ptr<server> ptr_t;
static ptr_t create(boost::asio::io_service &io_svc, short port) {
return ptr_t(new server(io_svc, port));
}
void start() {
session *new_session(new session(acceptor.get_io_service()));
ptr_t srv(this->shared_from_this());
acceptor.async_accept(new_session->get_socket(), [srv,new_session](boost::system::error_code const &error) {
if(!error) {
new_session->start();
} else {
std::cout << error.message() << std::endl;
}
srv->start();
});
}
};
int main(int argc, char ** argv) {
boost::asio::io_service io_svc;
io_svc.post([&]() {
server::ptr_t srv(server::create(io_svc, 1000));
srv->start();
});
io_svc.run();
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment