Skip to content

Instantly share code, notes, and snippets.

@wackoisgod
Created August 3, 2012 03:20
Show Gist options
  • Save wackoisgod/3244026 to your computer and use it in GitHub Desktop.
Save wackoisgod/3244026 to your computer and use it in GitHub Desktop.
#ifndef ENYO_PROTOBUF_CONNECTION_HPP
#define ENYO_PROTOBUF_CONNECTION_HPP
#include "asioconnection.hpp"
#include <boost/cstdint.hpp>
#include <boost/bind.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/shared_ptr.hpp>
#include <google/protobuf/io/coded_stream.h>
#include <google/protobuf/io/zero_copy_stream_impl.h>
#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
#include <google/protobuf/message.h>
#include <google/protobuf/descriptor.h>
#include <vector>
#include <string>
#include <deque>
#include "enyoserver.hpp"
#include <iostream>
#include "logging.hpp"
template <class T> class ProtobufConnection
: public AsioConnection<T> {
public:
virtual ~ProtobufConnection();
virtual void start();
virtual void send(std::string &msg);
void send(const google::protobuf::Message &m);
protected:
ProtobufConnection(EnyoServer *app);
virtual void receive(const std::string &encodedMsg) = 0;
virtual bool timeoutsEnabled();
private:
// TODO: shouldn't these be asio buffers?
std::vector<unsigned char> varintBuf_;
std::vector<unsigned char> messageBuf_;
boost::asio::deadline_timer timer_;
uint32_t messageLength_;
std::deque<std::string> outbound_;
std::deque<boost::shared_ptr<google::protobuf::Message> > outboundTwo_;
bool writing_;
boost::asio::strand strand_;
std::string writeData_;
boost::asio::const_buffers_1 writeBuf_;
static const size_t MAX_PAYLOAD_SIZE = 65535; // TODO: revisit this, it seems too large
static const unsigned GENERAL_TIMEOUT = 3000;
static const unsigned HEADER_TIMEOUT = 250; ///< 250ms to wait for body
void transmitNext();
void transmitNextTwo();
void setupMessageData(const google::protobuf::Message &message);
void waitForHeader();
void headerReceived(const boost::system::error_code &ec, size_t bytes_transferred);
void messageReceived(const boost::system::error_code &ec, size_t bytes_transferred);
void sendMessageDone(const boost::system::error_code &ec, size_t bytes_transferred);
void timeout(const boost::system::error_code &ec);
void setTimer(uint64_t duration);
};
template <class T> ProtobufConnection<T>::ProtobufConnection(EnyoServer *app)
: AsioConnection<T>(app),
timer_(app->io()),
outbound_(),
writing_(false),
strand_(app->io()),
writeData_(),
writeBuf_(boost::asio::buffer(reinterpret_cast<const void*>(writeData_.data()), 0)) {
}
template <class T> ProtobufConnection<T>::~ProtobufConnection() {
// do nothing
}
template <class T> void ProtobufConnection<T>::start() {
// initialize buffers
varintBuf_.resize(sizeof(uint32_t)); // 4 bytes worst-case
messageBuf_.resize(MAX_PAYLOAD_SIZE);
waitForHeader();
}
template <class T> bool ProtobufConnection<T>::timeoutsEnabled() {
return false;
}
template <class T> void ProtobufConnection<T>::send(std::string &message) {
VLOG(1) << "send(): sending or queueing message";
outbound_.push_back(message);
transmitNext(); // only succeeds if able
}
template <class T> void ProtobufConnection<T>::send(const google::protobuf::Message &m) {
VLOG(1) << "send(): sending or queueing message";
boost::shared_ptr<google::protobuf::Message> m2(m.New());
m2.CopyFrom(m);
outboundTwo_.push_back(m2);
transmitNextTwo(); // only succeeds if able
}
template <class T> void ProtobufConnection<T>::setupMessageData(const google::protobuf::Message &message) {
// allocate the buffer, varint32 + payload
writeData_.clear();
writeData_.reserve(message.SpaceUsed());
google::protobuf::io::StringOutputStream sos(&writeData_);
google::protobuf::io::CodedOutputStream os(&sos);
os.WriteVarint32(message.ByteSize());
message.SerializeToCodedStream(&os);
}
template <class T> void ProtobufConnection<T>::transmitNextTwo() {
if (writing_) {
VLOG(1) << "transmitNext(): already writing, will transmit later";
return;
}
else if (outboundTwo_.empty()) {
VLOG(1) << "transmitNext(): nothing in outbound queue to send";
return;
}
else {
VLOG(1) << "transmitNext(): setting to write mode";
writing_ = true;
}
setupMessageData(outboundTwo_.front());
outboundTwo_.pop_front();
//VLOG(1) << "transmitNext(): raw message length: " << message.ByteSize() << ", varint length: " << message.SpaceUsed() ;
writeBuf_ = boost::asio::buffer(writeData_.data(), writeData_.size());
VLOG(1) << "transmitNext(): outbound write size: " << boost::asio::buffer_size(writeBuf_);
boost::asio::async_write(
this->socket_,
writeBuf_,
strand_.wrap(boost::bind(
&ProtobufConnection<T>::sendMessageDone,
this->shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred
))
);
}
template <class T> void ProtobufConnection<T>::transmitNext() {
if (writing_) {
VLOG(1) << "transmitNext(): already writing, will transmit later";
return;
}
else if (outbound_.empty()) {
VLOG(1) << "transmitNext(): nothing in outbound queue to send";
return;
}
else {
VLOG(1) << "transmitNext(): setting to write mode";
writing_ = true;
}
// get next in queue
std::string message(outbound_.front());
outbound_.pop_front();
std::size_t messageLength = message.length();
std::size_t varintLength = google::protobuf::io::CodedOutputStream::VarintSize32(messageLength);
VLOG(1) << "transmitNext(): raw message length: " << messageLength << ", varint length: " << varintLength;
assert(message.length() >= 3);
// allocate the buffer, varint32 + payload
writeData_.clear();
writeData_.reserve(varintLength + messageLength);
google::protobuf::io::StringOutputStream sos(&writeData_);
google::protobuf::io::CodedOutputStream os(&sos);
os.WriteVarint32(messageLength);
os.WriteString(message);
writeBuf_ = boost::asio::buffer(writeData_.data(), writeData_.size());
VLOG(1) << "transmitNext(): outbound write size: " << boost::asio::buffer_size(writeBuf_);
boost::asio::async_write(
this->socket_,
writeBuf_,
strand_.wrap(boost::bind(
&ProtobufConnection<T>::sendMessageDone,
this->shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred
))
);
}
template <class T> void ProtobufConnection<T>::sendMessageDone(const boost::system::error_code &ec, size_t bytes_transferred) {
if (ec) {
VLOG(1) << "sendMessageDone(): error closing connection" << ec;
this->socket_.close(); // explicit, but unnecessary
}
else {
VLOG(1) << "sendMessageDone(): bytes written: " << bytes_transferred;
writing_ = false;
transmitNext(); // only succeeds if able
}
}
template <class T> void ProtobufConnection<T>::waitForHeader() {
boost::asio::async_read(
this->socket_,
boost::asio::buffer(&varintBuf_[0], varintBuf_.size()),
boost::bind(
&ProtobufConnection<T>::headerReceived,
this->shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred
)
);
setTimer(GENERAL_TIMEOUT);
}
template <class T> void ProtobufConnection<T>::headerReceived(const boost::system::error_code &ec, size_t bytes_transferred) {
if (ec) {
return;
}
google::protobuf::io::ArrayInputStream ais(&varintBuf_[0], varintBuf_.size());
google::protobuf::io::CodedInputStream is(&ais);
// get the binary payload length
is.ReadVarint32(&messageLength_);
// if the payload is bigger than the message buffer, kill the motherfucker
if (messageLength_ > messageBuf_.size()) {
return; // TODO: additional cleanup?
}
boost::asio::async_read(
this->socket_,
boost::asio::buffer(
&messageBuf_[0],
messageLength_ - (
// the amount already read in the header buffer
varintBuf_.size() - google::protobuf::io::CodedOutputStream::VarintSize32(messageLength_)
)
),
boost::bind(
&ProtobufConnection<T>::messageReceived,
this->shared_from_this(),
boost::asio::placeholders::error,
boost::asio::placeholders::bytes_transferred
)
);
setTimer(HEADER_TIMEOUT);
}
template <class T> void ProtobufConnection<T>::messageReceived(const boost::system::error_code &ec, size_t bytes_transferred) {
if (ec) {
return;
}
google::protobuf::io::ArrayInputStream ais_HDR(&varintBuf_[0], varintBuf_.size());
google::protobuf::io::ArrayInputStream ais_MSG(&messageBuf_[0], messageBuf_.size());
// bridge the two data buffers into a single stream
google::protobuf::io::ZeroCopyInputStream *inputStreams[2] = {
&ais_HDR,
&ais_MSG
};
google::protobuf::io::ConcatenatingInputStream cis(inputStreams, 2);
google::protobuf::io::CodedInputStream is(&cis);
// read the varint again (dummy operation for variable seek)
uint32_t temp = 0;
is.ReadVarint32(&temp);
// read in the payload
std::string payload;
payload.reserve(messageLength_);
is.ReadString(&payload, messageLength_);
receive(payload);
waitForHeader(); // keep listening
// TODO: timeout mechanism should be enabled/disabled based on
// concrete connection type. not all connections may use it.
setTimer(GENERAL_TIMEOUT); // TODO: seems weird to set a timer when waiting indefinitely for next message
}
template <class T> void ProtobufConnection<T>::timeout(const boost::system::error_code &ec) {
// TODO: what to do here to cleanup and unregister from battles, etc?
// TODO: make sure timers don't fire when we're in the middle of processing messages
if (ec != boost::asio::error::operation_aborted) {
VLOG(1) << "ProtobufConnection<T>::timeout: error closing connection";
this->socket_.close();
}
}
template <class T> void ProtobufConnection<T>::setTimer(uint64_t duration) {
if (!timeoutsEnabled()) {
return;
}
if (duration) {
timer_.expires_from_now(boost::posix_time::milliseconds(duration)); // this will cancel all pending
timer_.async_wait(boost::bind(
&ProtobufConnection<T>::timeout,
this, boost::asio::placeholders::error
));
}
else {
timer_.cancel();
}
}
#endif // ENYO_PROTOBUF_CONNECTION_HPP
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment