Created
May 18, 2011 18:37
-
-
Save anonymous/979212 to your computer and use it in GitHub Desktop.
Problems using boost::asio::strand
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <string> | |
#include <vector> | |
#include <boost/asio.hpp> | |
#include <boost/signals.hpp> | |
#include <boost/noncopyable.hpp> | |
#include <boost/shared_ptr.hpp> | |
#include <boost/enable_shared_from_this.hpp> | |
#include <boost/thread/locks.hpp> | |
#include <boost/thread/recursive_mutex.hpp> | |
#include <boost/bind.hpp> | |
#include <iostream> | |
#include <map> | |
/* inlined private headers */ | |
int split(std::string const &src, char ch, std::string &oLeft, std::string &oRight); | |
void trim(std::string &str); | |
void munge(std::string &key); | |
std::string counter_filename(std::string const &ctr); | |
int querystring(std::string const &in, std::map<std::string, std::string> &oQuery); | |
void urldecode(std::string &str); | |
bool str_pat_match(std::string const &str, std::string const &pat); | |
typedef boost::lock_guard<boost::recursive_mutex> grab; | |
typedef boost::recursive_mutex lock; | |
/* class declaration */ | |
// EagerConnection will attempt to establish a connection to a remote side. | |
// | |
class EagerConnection : public boost::noncopyable, public boost::enable_shared_from_this<EagerConnection> | |
{ | |
public: | |
typedef std::vector<char> buffer; | |
EagerConnection(boost::asio::io_service &svc); | |
EagerConnection(std::string const &dest, boost::asio::io_service &svc); | |
~EagerConnection(); | |
void open(); | |
void open(std::string const &dest); | |
void close(); | |
bool opened(); | |
size_t pendingOut(); | |
size_t pendingIn(); | |
size_t readIn(void *ptr, size_t maxSize); | |
size_t peekIn(void *ptr, size_t maxSize); | |
void consume(size_t n); | |
void writeOut(void const *data, size_t size); | |
void abort(); | |
boost::asio::ip::tcp::endpoint endpoint() const; | |
boost::signal<void ()> onConnect_; | |
boost::signal<void ()> onDisconnect_; | |
boost::signal<void ()> onData_; | |
private: | |
friend class EagerConnectionFactory; | |
void tryLater(); | |
void startResolve(); | |
void on_resolve(boost::system::error_code const &err, boost::asio::ip::tcp::resolver::iterator endpoint); | |
void startConnect(boost::asio::ip::tcp::endpoint endpoint); | |
void on_connect(boost::system::error_code const &err, boost::asio::ip::tcp::endpoint endpoint); | |
void startWrite(); | |
void on_write(boost::system::error_code const &err, size_t xfer); | |
void startRead(); | |
void on_read(boost::system::error_code const &err, size_t xfer); | |
void init(); | |
bool opened_; | |
bool writePending_; | |
int backoff_; | |
int interlock_; | |
boost::asio::ip::tcp::socket socket_; | |
boost::asio::ip::tcp::resolver resolver_; | |
boost::asio::deadline_timer timer_; | |
boost::asio::strand strand_; | |
lock mutex_; | |
buffer writeBuf_; | |
buffer outgoing_; | |
buffer inBuf_; | |
buffer incoming_; | |
std::string resolveHost_; | |
std::string resolvePort_; | |
boost::asio::ip::tcp::endpoint endpoint_; | |
}; | |
/* class definition */ | |
using boost::asio::ip::tcp; | |
using namespace boost::asio; | |
EagerConnection::EagerConnection(boost::asio::io_service &svc) : | |
socket_(svc), | |
resolver_(svc), | |
timer_(svc), | |
strand_(svc), | |
interlock_(0) | |
{ | |
init(); | |
} | |
EagerConnection::EagerConnection(std::string const &dest, boost::asio::io_service &svc) : | |
socket_(svc), | |
resolver_(svc), | |
timer_(svc), | |
strand_(svc) | |
{ | |
init(); | |
open(dest); | |
} | |
void EagerConnection::init() | |
{ | |
opened_ = false; | |
backoff_ = 1; | |
} | |
EagerConnection::~EagerConnection() | |
{ | |
close(); | |
} | |
void EagerConnection::open() // called when accepted | |
{ | |
opened_ = true; | |
startRead(); | |
} | |
void EagerConnection::open(std::string const &dest) | |
{ | |
close(); | |
std::string host; | |
std::string port; | |
if (split(dest, ':', host, port) != 2) | |
{ | |
throw std::runtime_error("open() destination must be host:port"); | |
} | |
backoff_ = 1; | |
opened_ = true; | |
resolveHost_ = host; | |
resolvePort_ = port; | |
startResolve(); | |
} | |
void EagerConnection::close() | |
{ | |
if (!opened_) | |
{ | |
return; | |
} | |
opened_ = false; | |
socket_.close(); | |
timer_.cancel(); | |
grab aholdof(mutex_); | |
writeBuf_.clear(); | |
inBuf_.clear(); | |
outgoing_.clear(); | |
incoming_.clear(); | |
} | |
bool EagerConnection::opened() | |
{ | |
return opened_; | |
} | |
size_t EagerConnection::pendingOut() | |
{ | |
return outgoing_.size(); | |
} | |
size_t EagerConnection::pendingIn() | |
{ | |
return incoming_.size(); | |
} | |
size_t EagerConnection::readIn(void *ptr, size_t maxSize) | |
{ | |
grab aholdof(mutex_); | |
size_t n = peekIn(ptr, maxSize); | |
consume(n); | |
} | |
size_t EagerConnection::peekIn(void *ptr, size_t maxSize) | |
{ | |
grab aholdof(mutex_); | |
size_t n = maxSize; | |
if (n > incoming_.size()) | |
{ | |
n = incoming_.size(); | |
} | |
if (n > 0 && ptr != 0) | |
{ | |
memcpy(ptr, &incoming_[0], n); | |
} | |
return n; | |
} | |
void EagerConnection::consume(size_t n) | |
{ | |
grab aholdof(mutex_); | |
if (n > 0) | |
{ | |
if (n > incoming_.size()) | |
{ | |
throw std::runtime_error("EagerConnection::consume() of too much data"); | |
} | |
incoming_.erase(incoming_.begin(), incoming_.begin() + n); | |
} | |
} | |
void EagerConnection::writeOut(void const *data, size_t size) | |
{ | |
if (!opened_) | |
{ | |
throw std::runtime_error("Attempt to writeOut() to a non-open() EagerConnection."); | |
} | |
grab aholdof(mutex_); | |
outgoing_.insert(outgoing_.end(), (char const *)data, (char const *)data + size); | |
startWrite(); | |
} | |
void EagerConnection::abort() | |
{ | |
socket_.close(); | |
{ | |
grab aholdof(mutex_); | |
writeBuf_.clear(); | |
inBuf_.clear(); | |
outgoing_.clear(); | |
incoming_.clear(); | |
} | |
onDisconnect_(); | |
} | |
tcp::endpoint EagerConnection::endpoint() const | |
{ | |
return endpoint_; | |
} | |
void EagerConnection::startResolve() | |
{ | |
std::cerr << "Resolving " << resolveHost_ << ":" << resolvePort_ << std::endl; | |
boost::system::error_code error; | |
tcp::resolver::iterator iter = resolver_.resolve(tcp::resolver::query(resolveHost_, resolvePort_), error); | |
on_resolve(error, iter); | |
} | |
void EagerConnection::on_resolve(boost::system::error_code const &err, tcp::resolver::iterator endpoint) | |
{ | |
if(!err) | |
{ | |
endpoint_ = *endpoint; | |
startConnect(*endpoint); | |
} | |
else | |
{ | |
std::cerr << "Could not resolve " << resolveHost_ << ":" << resolvePort_ << std::endl; | |
tryLater(); | |
} | |
} | |
void EagerConnection::tryLater() | |
{ | |
if (resolveHost_.size() && resolvePort_.size()) | |
{ | |
std::cerr << "Trying again in " << backoff_ << " seconds." << std::endl; | |
timer_.expires_from_now(boost::posix_time::seconds(backoff_)); | |
timer_.async_wait(strand_.wrap(boost::bind(&EagerConnection::startResolve, this))); | |
backoff_ = (int)((backoff_ + 1) * 1.5); | |
if (backoff_ > 100) | |
{ | |
// wait at most 3 minutes, even when backing off | |
backoff_ = 180; | |
} | |
} | |
} | |
void EagerConnection::startConnect(tcp::endpoint endpoint) | |
{ | |
std::cerr << "Connecting to " << endpoint << std::endl; | |
socket_.async_connect(endpoint, | |
strand_.wrap(boost::bind(&EagerConnection::on_connect, this, placeholders::error, endpoint))); | |
} | |
void EagerConnection::on_connect(boost::system::error_code const &err, tcp::endpoint endpoint) | |
{ | |
if (!err) | |
{ | |
std::cerr << "Connected to " << endpoint << std::endl; | |
backoff_ = 1; | |
onConnect_(); | |
startRead(); | |
} | |
else | |
{ | |
std::cerr << "Error connecting to " << endpoint << std::endl; | |
tryLater(); | |
} | |
} | |
void EagerConnection::startWrite() | |
{ | |
if (socket_.is_open()) | |
{ | |
grab aholdof(mutex_); | |
if (outgoing_.size() && !writePending_) | |
{ | |
writeBuf_.swap(outgoing_); | |
outgoing_.clear(); | |
writePending_ = true; | |
char const *ptr = &writeBuf_[0]; | |
boost::asio::async_write(socket_, boost::asio::buffer(ptr, writeBuf_.size()), | |
strand_.wrap(boost::bind(&EagerConnection::on_write, this, placeholders::error, placeholders::bytes_transferred))); | |
} | |
} | |
} | |
void EagerConnection::on_write(boost::system::error_code const &err, size_t xfer) | |
{ | |
grab aholdof(mutex_); | |
writePending_ = false; | |
if (!!err || xfer < writeBuf_.size()) | |
{ | |
std::cerr << "Short write on socket to " << resolveHost_ << ":" << resolvePort_ << ": " << err << std::endl; | |
socket_.close(); | |
onDisconnect_(); | |
tryLater(); | |
} | |
startWrite(); | |
} | |
// There is a trade-off between throughput and ability to have many connections open. | |
// I may want to have 10,000 connections open (monitoring a large cluster of machines, say) | |
// and I don't want to waste more than 160 MB on such a set-up: 10k * 16k buffers == 160M | |
void EagerConnection::startRead() | |
{ | |
grab aholdof(mutex_); | |
inBuf_.resize(16384); | |
char *ptr = &inBuf_[0]; | |
boost::asio::async_read(socket_, boost::asio::buffer(ptr, 16384), boost::asio::transfer_at_least(1), | |
strand_.wrap(boost::bind(&EagerConnection::on_read, this, placeholders::error, placeholders::bytes_transferred))); | |
} | |
void EagerConnection::on_read(boost::system::error_code const &err, size_t xfer) | |
{ | |
if (!err) | |
{ | |
if (xfer > 0) | |
{ | |
grab aholdof(mutex_); | |
incoming_.insert(incoming_.end(), inBuf_.begin(), inBuf_.begin() + xfer); | |
} | |
assert(0 == __sync_fetch_and_add(&interlock_, 1)); | |
onData_(); | |
assert(1 == __sync_fetch_and_add(&interlock_, -1)); | |
startRead(); | |
} | |
else | |
{ | |
std::cerr << "Socket error for " << resolveHost_ << ":" << resolvePort_ << ": " << err << std::endl; | |
socket_.close(); | |
onDisconnect_(); | |
if (opened_) | |
{ | |
tryLater(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment