Skip to content

Embed URL

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
asio multichannel demo
*~
.*~
.*.swp
*.o
test
run_client
run_server
#pragma once
#include "listener.hpp"
namespace demo
{
using namespace boost;
using namespace asio;
using ip::tcp;
#include <boost/asio/yield.hpp>
class client : public coroutine
{
public:
explicit client(io_service& svc, std::string address, std::string port)
: address_(address), port_(port),
backchan1(svc, "localhost", "8001", [](std::string const& s) { return "backchan1: " + s; }),
backchan2(svc, "localhost", "8002", [](std::string const& s) { return "backchan2: " + s; }),
resolver_(make_shared<tcp::resolver>(svc)),
socket_ (make_shared<tcp::socket>(svc))
{
resolver_->async_resolve(tcp::resolver::query(address_, port_),
bind(&client::handle_resolve, this, placeholders::error, placeholders::iterator));
svc.post(backchan1);
svc.post(backchan2);
}
typedef void result_type;
void handle_resolve(const boost::system::error_code& err, tcp::resolver::iterator endpoint_iterator) {
if (!err) async_connect(*socket_, endpoint_iterator, bind(ref(*this), placeholders::error));
}
void operator()(system::error_code ec={}, std::size_t bytes_transferred={})
{
if (!ec) {
reenter (this) {
remote_endpoint = socket_->remote_endpoint().address().to_string() + ":" + std::to_string(socket_->remote_endpoint().port());
yield async_write(*socket_, buffer("hello world from demo client"), bind(ref(*this), placeholders::error, placeholders::bytes_transferred));
buffer_ = make_shared<array<char, 8192> >();
yield socket_->async_read_some(buffer(*buffer_), *this);
{
std::string incoming(buffer_->data(), bytes_transferred);
std::cerr << "listener " << socket_->local_endpoint() << ": '" << incoming << "' received from " << remote_endpoint << "\n";
}
socket_->shutdown(tcp::socket::shutdown_both, ec);
backchan1.shutdown();
backchan2.shutdown();
}
}
}
private:
std::string address_, port_;
std::string remote_endpoint;
listener backchan1, backchan2;
shared_ptr<tcp::resolver> resolver_;
shared_ptr<tcp::socket> socket_;
boost::shared_ptr<array<char, 8192> > buffer_;
};
#include <boost/asio/unyield.hpp>
}
#pragma once
#include <boost/asio.hpp>
#include <boost/bind.hpp>
#include <boost/make_shared.hpp>
#include <boost/function.hpp>
#include <boost/array.hpp>
#include <iostream>
namespace demo
{
using namespace boost;
using namespace asio;
using ip::tcp;
#include <boost/asio/yield.hpp>
template <class Derived> // CRTP for clonability
class basic_listener : public coroutine
{
protected:
using base_type = basic_listener<Derived>;
public:
explicit basic_listener(io_service& svc, std::string address, std::string port, function<std::string(Derived&, std::string const&)> handler)
: handler_(handler)
{
tcp::resolver resolver(svc);
tcp::resolver::query query(address, port);
acceptor_ = make_shared<tcp::acceptor>(svc, *resolver.resolve(query));
}
virtual ~basic_listener() {}
void operator()(system::error_code ec={}, std::size_t bytes_transferred={})
{
if (!ec) {
reenter (this) {
do {
socket_ = make_shared<tcp::socket>(acceptor_->get_io_service());
yield acceptor_->async_accept(*socket_, derived()); // use `derived()` for deep copy
fork clone()();
} while (is_parent());
buffer_ = make_shared<array<char, 8192> >();
if (on_accept(*socket_))
{
yield socket_->async_read_some(buffer(*buffer_), derived());
{
std::string incoming(buffer_->data(), bytes_transferred);
response_ = make_shared<std::string>(handler_(derived(), incoming));
std::cerr << "listener " << socket_->local_endpoint() << ": '" << incoming << "' received from " << socket_->remote_endpoint() << "\n";
}
yield async_write(*socket_, buffer(*response_), derived());
} else
{
yield async_write(*socket_, buffer("Connection refused: on_accept couldn't complete the back connections"), derived());
}
socket_->shutdown(tcp::socket::shutdown_both, ec);
}
}
}
virtual bool on_accept(tcp::socket& socket) {
std::cerr << "listener " << socket.local_endpoint() << ": accepting connection from " << socket.remote_endpoint() << "\n";
return true;
}
void shutdown()
{
if (socket_ && socket_->is_open())
{
socket_->cancel();
socket_->shutdown(tcp::socket::shutdown_both);
}
if (acceptor_ && acceptor_->is_open())
{
acceptor_->cancel();
acceptor_->close();
}
}
protected:
Derived& derived() { return static_cast<Derived&>(*this); } // for deep copying of coroutine
Derived clone() { return Derived(derived()); }
function<std::string(Derived&, const std::string &)> handler_;
boost::shared_ptr<tcp::acceptor> acceptor_; // only in the listener
boost::shared_ptr<tcp::socket> socket_; // client connection
boost::shared_ptr<array<char, 8192> > buffer_;
boost::shared_ptr<std::string> response_;
};
struct listener : basic_listener<listener> {
explicit listener(io_service& svc, std::string address, std::string port, function<std::string(std::string const&)> handler)
: base_type(svc, address, port, [handler](listener&, std::string const& s) { return handler(s); })
{
}
};
#include <boost/asio/unyield.hpp>
}
all:test run_client run_server
CPPFLAGS+=-std=c++0x -Wall -pedantic
CPPFLAGS+=-g -O1
CPPFLAGS+=-isystem ~/custom/boost_1_55_0
LDFLAGS+=-L ~/custom/boost_1_55_0/stage/lib/ -Wl,-rpath,/home/sehe/custom/boost_1_55_0/stage/lib
LDFLAGS+=-lboost_system -lboost_thread -lpthread
%:%.cpp *.hpp
$(CXX) $(CPPFLAGS) $< -o $@ $(LDFLAGS)
#include <boost/asio.hpp>
#include "server.hpp"
#include "client.hpp"
int main()
{
boost::asio::io_service svc;
std::cerr << "Starting a test client that sends a message...\n";
demo::client client(svc, "localhost", "8000");
svc.run();
}
#include <boost/asio.hpp>
#include "server.hpp"
#include "client.hpp"
int main()
{
boost::asio::io_service svc;
svc.post(demo::server(svc));
svc.run();
}
#pragma once
#include "listener.hpp"
namespace demo
{
using namespace boost;
using namespace asio;
using ip::tcp;
struct server : basic_listener<server>
{
server(io_service& svc) : basic_listener<server>(svc, "localhost", "8000", &server::handler)
{
}
private:
static std::string handler(server& session, std::string const& message)
{
session.do_back_chatter(message);
return "ECHO " + message;
}
static void hold(shared_ptr<std::string>){}; // trick to keep the buffers around
void do_back_chatter(std::string const& message)
{
auto msg1 = make_shared<std::string>("We've received a request of length " + std::to_string(message.length()));
auto msg2 = make_shared<std::string>("We're handling it in ");
*msg2 += __PRETTY_FUNCTION__;
backsock1->async_write_some(buffer(*msg1), bind(hold, msg1));
backsock2->async_write_some(buffer(*msg2), bind(hold, msg2));
}
virtual bool on_accept(tcp::socket& socket) override
{
auto host = socket.remote_endpoint().address().to_string();
// for now setting up the back-connections is all synchronous -
// that might not work well in practice (scaling, latency) but...
try
{
tcp::resolver resolver(socket.get_io_service());
auto ep1 = resolver.resolve(tcp::resolver::query(host, "8001"));
auto ep2 = resolver.resolve(tcp::resolver::query(host, "8002"));
backsock1 = make_shared<tcp::socket>(socket.get_io_service());
backsock2 = make_shared<tcp::socket>(socket.get_io_service());
backsock1->connect(*ep1);
backsock2->connect(*ep2);
std::cerr << "on_accept: back channels connected for " << host << "\n";
} catch(std::exception const& e)
{
std::cerr << "on_accept: '" << e.what() << "' for " << host << "\n";
return false;
}
return base_type::on_accept(socket);
}
private:
shared_ptr<tcp::socket> backsock1, backsock2;
};
}
#include <boost/asio.hpp>
#include <boost/thread.hpp>
#include "server.hpp"
#include "client.hpp"
int main()
{
boost::asio::io_service svc;
// start service on a separate thread
boost::thread th([&svc] {
svc.post(demo::server(svc));
svc.run();
});
boost::this_thread::sleep_for(boost::chrono::milliseconds(500)); // allow server to start accepting
// post client traffic to the service
std::cerr << "Starting a test client that sends a message...\n";
demo::client client(svc, "localhost", "8000");
// await interrupt (or new connections)
th.join();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.