Skip to content

Instantly share code, notes, and snippets.

Created May 18, 2011 18:37
Show Gist options
  • Save anonymous/979212 to your computer and use it in GitHub Desktop.
Save anonymous/979212 to your computer and use it in GitHub Desktop.
Problems using boost::asio::strand
#include <string>
#include <vector>
#include <boost/asio.hpp>
#include <boost/signals.hpp>
#include <boost/noncopyable.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/thread/locks.hpp>
#include <boost/thread/recursive_mutex.hpp>
#include <boost/bind.hpp>
#include <iostream>
#include <map>
/* inlined private headers */
int split(std::string const &src, char ch, std::string &oLeft, std::string &oRight);
void trim(std::string &str);
void munge(std::string &key);
std::string counter_filename(std::string const &ctr);
int querystring(std::string const &in, std::map<std::string, std::string> &oQuery);
void urldecode(std::string &str);
bool str_pat_match(std::string const &str, std::string const &pat);
typedef boost::lock_guard<boost::recursive_mutex> grab;
typedef boost::recursive_mutex lock;
/* class declaration */
// EagerConnection will attempt to establish a connection to a remote side.
//
class EagerConnection : public boost::noncopyable, public boost::enable_shared_from_this<EagerConnection>
{
public:
typedef std::vector<char> buffer;
EagerConnection(boost::asio::io_service &svc);
EagerConnection(std::string const &dest, boost::asio::io_service &svc);
~EagerConnection();
void open();
void open(std::string const &dest);
void close();
bool opened();
size_t pendingOut();
size_t pendingIn();
size_t readIn(void *ptr, size_t maxSize);
size_t peekIn(void *ptr, size_t maxSize);
void consume(size_t n);
void writeOut(void const *data, size_t size);
void abort();
boost::asio::ip::tcp::endpoint endpoint() const;
boost::signal<void ()> onConnect_;
boost::signal<void ()> onDisconnect_;
boost::signal<void ()> onData_;
private:
friend class EagerConnectionFactory;
void tryLater();
void startResolve();
void on_resolve(boost::system::error_code const &err, boost::asio::ip::tcp::resolver::iterator endpoint);
void startConnect(boost::asio::ip::tcp::endpoint endpoint);
void on_connect(boost::system::error_code const &err, boost::asio::ip::tcp::endpoint endpoint);
void startWrite();
void on_write(boost::system::error_code const &err, size_t xfer);
void startRead();
void on_read(boost::system::error_code const &err, size_t xfer);
void init();
bool opened_;
bool writePending_;
int backoff_;
int interlock_;
boost::asio::ip::tcp::socket socket_;
boost::asio::ip::tcp::resolver resolver_;
boost::asio::deadline_timer timer_;
boost::asio::strand strand_;
lock mutex_;
buffer writeBuf_;
buffer outgoing_;
buffer inBuf_;
buffer incoming_;
std::string resolveHost_;
std::string resolvePort_;
boost::asio::ip::tcp::endpoint endpoint_;
};
/* class definition */
using boost::asio::ip::tcp;
using namespace boost::asio;
EagerConnection::EagerConnection(boost::asio::io_service &svc) :
socket_(svc),
resolver_(svc),
timer_(svc),
strand_(svc),
interlock_(0)
{
init();
}
EagerConnection::EagerConnection(std::string const &dest, boost::asio::io_service &svc) :
socket_(svc),
resolver_(svc),
timer_(svc),
strand_(svc)
{
init();
open(dest);
}
void EagerConnection::init()
{
opened_ = false;
backoff_ = 1;
}
EagerConnection::~EagerConnection()
{
close();
}
void EagerConnection::open() // called when accepted
{
opened_ = true;
startRead();
}
void EagerConnection::open(std::string const &dest)
{
close();
std::string host;
std::string port;
if (split(dest, ':', host, port) != 2)
{
throw std::runtime_error("open() destination must be host:port");
}
backoff_ = 1;
opened_ = true;
resolveHost_ = host;
resolvePort_ = port;
startResolve();
}
void EagerConnection::close()
{
if (!opened_)
{
return;
}
opened_ = false;
socket_.close();
timer_.cancel();
grab aholdof(mutex_);
writeBuf_.clear();
inBuf_.clear();
outgoing_.clear();
incoming_.clear();
}
bool EagerConnection::opened()
{
return opened_;
}
size_t EagerConnection::pendingOut()
{
return outgoing_.size();
}
size_t EagerConnection::pendingIn()
{
return incoming_.size();
}
size_t EagerConnection::readIn(void *ptr, size_t maxSize)
{
grab aholdof(mutex_);
size_t n = peekIn(ptr, maxSize);
consume(n);
}
size_t EagerConnection::peekIn(void *ptr, size_t maxSize)
{
grab aholdof(mutex_);
size_t n = maxSize;
if (n > incoming_.size())
{
n = incoming_.size();
}
if (n > 0 && ptr != 0)
{
memcpy(ptr, &incoming_[0], n);
}
return n;
}
void EagerConnection::consume(size_t n)
{
grab aholdof(mutex_);
if (n > 0)
{
if (n > incoming_.size())
{
throw std::runtime_error("EagerConnection::consume() of too much data");
}
incoming_.erase(incoming_.begin(), incoming_.begin() + n);
}
}
void EagerConnection::writeOut(void const *data, size_t size)
{
if (!opened_)
{
throw std::runtime_error("Attempt to writeOut() to a non-open() EagerConnection.");
}
grab aholdof(mutex_);
outgoing_.insert(outgoing_.end(), (char const *)data, (char const *)data + size);
startWrite();
}
void EagerConnection::abort()
{
socket_.close();
{
grab aholdof(mutex_);
writeBuf_.clear();
inBuf_.clear();
outgoing_.clear();
incoming_.clear();
}
onDisconnect_();
}
tcp::endpoint EagerConnection::endpoint() const
{
return endpoint_;
}
void EagerConnection::startResolve()
{
std::cerr << "Resolving " << resolveHost_ << ":" << resolvePort_ << std::endl;
boost::system::error_code error;
tcp::resolver::iterator iter = resolver_.resolve(tcp::resolver::query(resolveHost_, resolvePort_), error);
on_resolve(error, iter);
}
void EagerConnection::on_resolve(boost::system::error_code const &err, tcp::resolver::iterator endpoint)
{
if(!err)
{
endpoint_ = *endpoint;
startConnect(*endpoint);
}
else
{
std::cerr << "Could not resolve " << resolveHost_ << ":" << resolvePort_ << std::endl;
tryLater();
}
}
void EagerConnection::tryLater()
{
if (resolveHost_.size() && resolvePort_.size())
{
std::cerr << "Trying again in " << backoff_ << " seconds." << std::endl;
timer_.expires_from_now(boost::posix_time::seconds(backoff_));
timer_.async_wait(strand_.wrap(boost::bind(&EagerConnection::startResolve, this)));
backoff_ = (int)((backoff_ + 1) * 1.5);
if (backoff_ > 100)
{
// wait at most 3 minutes, even when backing off
backoff_ = 180;
}
}
}
void EagerConnection::startConnect(tcp::endpoint endpoint)
{
std::cerr << "Connecting to " << endpoint << std::endl;
socket_.async_connect(endpoint,
strand_.wrap(boost::bind(&EagerConnection::on_connect, this, placeholders::error, endpoint)));
}
void EagerConnection::on_connect(boost::system::error_code const &err, tcp::endpoint endpoint)
{
if (!err)
{
std::cerr << "Connected to " << endpoint << std::endl;
backoff_ = 1;
onConnect_();
startRead();
}
else
{
std::cerr << "Error connecting to " << endpoint << std::endl;
tryLater();
}
}
void EagerConnection::startWrite()
{
if (socket_.is_open())
{
grab aholdof(mutex_);
if (outgoing_.size() && !writePending_)
{
writeBuf_.swap(outgoing_);
outgoing_.clear();
writePending_ = true;
char const *ptr = &writeBuf_[0];
boost::asio::async_write(socket_, boost::asio::buffer(ptr, writeBuf_.size()),
strand_.wrap(boost::bind(&EagerConnection::on_write, this, placeholders::error, placeholders::bytes_transferred)));
}
}
}
void EagerConnection::on_write(boost::system::error_code const &err, size_t xfer)
{
grab aholdof(mutex_);
writePending_ = false;
if (!!err || xfer < writeBuf_.size())
{
std::cerr << "Short write on socket to " << resolveHost_ << ":" << resolvePort_ << ": " << err << std::endl;
socket_.close();
onDisconnect_();
tryLater();
}
startWrite();
}
// There is a trade-off between throughput and ability to have many connections open.
// I may want to have 10,000 connections open (monitoring a large cluster of machines, say)
// and I don't want to waste more than 160 MB on such a set-up: 10k * 16k buffers == 160M
void EagerConnection::startRead()
{
grab aholdof(mutex_);
inBuf_.resize(16384);
char *ptr = &inBuf_[0];
boost::asio::async_read(socket_, boost::asio::buffer(ptr, 16384), boost::asio::transfer_at_least(1),
strand_.wrap(boost::bind(&EagerConnection::on_read, this, placeholders::error, placeholders::bytes_transferred)));
}
void EagerConnection::on_read(boost::system::error_code const &err, size_t xfer)
{
if (!err)
{
if (xfer > 0)
{
grab aholdof(mutex_);
incoming_.insert(incoming_.end(), inBuf_.begin(), inBuf_.begin() + xfer);
}
assert(0 == __sync_fetch_and_add(&interlock_, 1));
onData_();
assert(1 == __sync_fetch_and_add(&interlock_, -1));
startRead();
}
else
{
std::cerr << "Socket error for " << resolveHost_ << ":" << resolvePort_ << ": " << err << std::endl;
socket_.close();
onDisconnect_();
if (opened_)
{
tryLater();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment