Created
August 3, 2012 03:20
-
-
Save wackoisgod/3244026 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#ifndef ENYO_PROTOBUF_CONNECTION_HPP | |
#define ENYO_PROTOBUF_CONNECTION_HPP | |
#include "asioconnection.hpp" | |
#include <boost/cstdint.hpp> | |
#include <boost/bind.hpp> | |
#include <boost/date_time/posix_time/posix_time.hpp> | |
#include <boost/shared_ptr.hpp> | |
#include <google/protobuf/io/coded_stream.h> | |
#include <google/protobuf/io/zero_copy_stream_impl.h> | |
#include <google/protobuf/io/zero_copy_stream_impl_lite.h> | |
#include <google/protobuf/message.h> | |
#include <google/protobuf/descriptor.h> | |
#include <vector> | |
#include <string> | |
#include <deque> | |
#include "enyoserver.hpp" | |
#include <iostream> | |
#include "logging.hpp" | |
template <class T> class ProtobufConnection | |
: public AsioConnection<T> { | |
public: | |
virtual ~ProtobufConnection(); | |
virtual void start(); | |
virtual void send(std::string &msg); | |
void send(const google::protobuf::Message &m); | |
protected: | |
ProtobufConnection(EnyoServer *app); | |
virtual void receive(const std::string &encodedMsg) = 0; | |
virtual bool timeoutsEnabled(); | |
private: | |
// TODO: shouldn't these be asio buffers? | |
std::vector<unsigned char> varintBuf_; | |
std::vector<unsigned char> messageBuf_; | |
boost::asio::deadline_timer timer_; | |
uint32_t messageLength_; | |
std::deque<std::string> outbound_; | |
std::deque<boost::shared_ptr<google::protobuf::Message> > outboundTwo_; | |
bool writing_; | |
boost::asio::strand strand_; | |
std::string writeData_; | |
boost::asio::const_buffers_1 writeBuf_; | |
static const size_t MAX_PAYLOAD_SIZE = 65535; // TODO: revisit this, it seems too large | |
static const unsigned GENERAL_TIMEOUT = 3000; | |
static const unsigned HEADER_TIMEOUT = 250; ///< 250ms to wait for body | |
void transmitNext(); | |
void transmitNextTwo(); | |
void setupMessageData(const google::protobuf::Message &message); | |
void waitForHeader(); | |
void headerReceived(const boost::system::error_code &ec, size_t bytes_transferred); | |
void messageReceived(const boost::system::error_code &ec, size_t bytes_transferred); | |
void sendMessageDone(const boost::system::error_code &ec, size_t bytes_transferred); | |
void timeout(const boost::system::error_code &ec); | |
void setTimer(uint64_t duration); | |
}; | |
template <class T> ProtobufConnection<T>::ProtobufConnection(EnyoServer *app) | |
: AsioConnection<T>(app), | |
timer_(app->io()), | |
outbound_(), | |
writing_(false), | |
strand_(app->io()), | |
writeData_(), | |
writeBuf_(boost::asio::buffer(reinterpret_cast<const void*>(writeData_.data()), 0)) { | |
} | |
template <class T> ProtobufConnection<T>::~ProtobufConnection() { | |
// do nothing | |
} | |
template <class T> void ProtobufConnection<T>::start() { | |
// initialize buffers | |
varintBuf_.resize(sizeof(uint32_t)); // 4 bytes worst-case | |
messageBuf_.resize(MAX_PAYLOAD_SIZE); | |
waitForHeader(); | |
} | |
template <class T> bool ProtobufConnection<T>::timeoutsEnabled() { | |
return false; | |
} | |
template <class T> void ProtobufConnection<T>::send(std::string &message) { | |
VLOG(1) << "send(): sending or queueing message"; | |
outbound_.push_back(message); | |
transmitNext(); // only succeeds if able | |
} | |
template <class T> void ProtobufConnection<T>::send(const google::protobuf::Message &m) { | |
VLOG(1) << "send(): sending or queueing message"; | |
boost::shared_ptr<google::protobuf::Message> m2(m.New()); | |
m2.CopyFrom(m); | |
outboundTwo_.push_back(m2); | |
transmitNextTwo(); // only succeeds if able | |
} | |
template <class T> void ProtobufConnection<T>::setupMessageData(const google::protobuf::Message &message) { | |
// allocate the buffer, varint32 + payload | |
writeData_.clear(); | |
writeData_.reserve(message.SpaceUsed()); | |
google::protobuf::io::StringOutputStream sos(&writeData_); | |
google::protobuf::io::CodedOutputStream os(&sos); | |
os.WriteVarint32(message.ByteSize()); | |
message.SerializeToCodedStream(&os); | |
} | |
template <class T> void ProtobufConnection<T>::transmitNextTwo() { | |
if (writing_) { | |
VLOG(1) << "transmitNext(): already writing, will transmit later"; | |
return; | |
} | |
else if (outboundTwo_.empty()) { | |
VLOG(1) << "transmitNext(): nothing in outbound queue to send"; | |
return; | |
} | |
else { | |
VLOG(1) << "transmitNext(): setting to write mode"; | |
writing_ = true; | |
} | |
setupMessageData(outboundTwo_.front()); | |
outboundTwo_.pop_front(); | |
//VLOG(1) << "transmitNext(): raw message length: " << message.ByteSize() << ", varint length: " << message.SpaceUsed() ; | |
writeBuf_ = boost::asio::buffer(writeData_.data(), writeData_.size()); | |
VLOG(1) << "transmitNext(): outbound write size: " << boost::asio::buffer_size(writeBuf_); | |
boost::asio::async_write( | |
this->socket_, | |
writeBuf_, | |
strand_.wrap(boost::bind( | |
&ProtobufConnection<T>::sendMessageDone, | |
this->shared_from_this(), | |
boost::asio::placeholders::error, | |
boost::asio::placeholders::bytes_transferred | |
)) | |
); | |
} | |
template <class T> void ProtobufConnection<T>::transmitNext() { | |
if (writing_) { | |
VLOG(1) << "transmitNext(): already writing, will transmit later"; | |
return; | |
} | |
else if (outbound_.empty()) { | |
VLOG(1) << "transmitNext(): nothing in outbound queue to send"; | |
return; | |
} | |
else { | |
VLOG(1) << "transmitNext(): setting to write mode"; | |
writing_ = true; | |
} | |
// get next in queue | |
std::string message(outbound_.front()); | |
outbound_.pop_front(); | |
std::size_t messageLength = message.length(); | |
std::size_t varintLength = google::protobuf::io::CodedOutputStream::VarintSize32(messageLength); | |
VLOG(1) << "transmitNext(): raw message length: " << messageLength << ", varint length: " << varintLength; | |
assert(message.length() >= 3); | |
// allocate the buffer, varint32 + payload | |
writeData_.clear(); | |
writeData_.reserve(varintLength + messageLength); | |
google::protobuf::io::StringOutputStream sos(&writeData_); | |
google::protobuf::io::CodedOutputStream os(&sos); | |
os.WriteVarint32(messageLength); | |
os.WriteString(message); | |
writeBuf_ = boost::asio::buffer(writeData_.data(), writeData_.size()); | |
VLOG(1) << "transmitNext(): outbound write size: " << boost::asio::buffer_size(writeBuf_); | |
boost::asio::async_write( | |
this->socket_, | |
writeBuf_, | |
strand_.wrap(boost::bind( | |
&ProtobufConnection<T>::sendMessageDone, | |
this->shared_from_this(), | |
boost::asio::placeholders::error, | |
boost::asio::placeholders::bytes_transferred | |
)) | |
); | |
} | |
template <class T> void ProtobufConnection<T>::sendMessageDone(const boost::system::error_code &ec, size_t bytes_transferred) { | |
if (ec) { | |
VLOG(1) << "sendMessageDone(): error closing connection" << ec; | |
this->socket_.close(); // explicit, but unnecessary | |
} | |
else { | |
VLOG(1) << "sendMessageDone(): bytes written: " << bytes_transferred; | |
writing_ = false; | |
transmitNext(); // only succeeds if able | |
} | |
} | |
template <class T> void ProtobufConnection<T>::waitForHeader() { | |
boost::asio::async_read( | |
this->socket_, | |
boost::asio::buffer(&varintBuf_[0], varintBuf_.size()), | |
boost::bind( | |
&ProtobufConnection<T>::headerReceived, | |
this->shared_from_this(), | |
boost::asio::placeholders::error, | |
boost::asio::placeholders::bytes_transferred | |
) | |
); | |
setTimer(GENERAL_TIMEOUT); | |
} | |
template <class T> void ProtobufConnection<T>::headerReceived(const boost::system::error_code &ec, size_t bytes_transferred) { | |
if (ec) { | |
return; | |
} | |
google::protobuf::io::ArrayInputStream ais(&varintBuf_[0], varintBuf_.size()); | |
google::protobuf::io::CodedInputStream is(&ais); | |
// get the binary payload length | |
is.ReadVarint32(&messageLength_); | |
// if the payload is bigger than the message buffer, kill the motherfucker | |
if (messageLength_ > messageBuf_.size()) { | |
return; // TODO: additional cleanup? | |
} | |
boost::asio::async_read( | |
this->socket_, | |
boost::asio::buffer( | |
&messageBuf_[0], | |
messageLength_ - ( | |
// the amount already read in the header buffer | |
varintBuf_.size() - google::protobuf::io::CodedOutputStream::VarintSize32(messageLength_) | |
) | |
), | |
boost::bind( | |
&ProtobufConnection<T>::messageReceived, | |
this->shared_from_this(), | |
boost::asio::placeholders::error, | |
boost::asio::placeholders::bytes_transferred | |
) | |
); | |
setTimer(HEADER_TIMEOUT); | |
} | |
template <class T> void ProtobufConnection<T>::messageReceived(const boost::system::error_code &ec, size_t bytes_transferred) { | |
if (ec) { | |
return; | |
} | |
google::protobuf::io::ArrayInputStream ais_HDR(&varintBuf_[0], varintBuf_.size()); | |
google::protobuf::io::ArrayInputStream ais_MSG(&messageBuf_[0], messageBuf_.size()); | |
// bridge the two data buffers into a single stream | |
google::protobuf::io::ZeroCopyInputStream *inputStreams[2] = { | |
&ais_HDR, | |
&ais_MSG | |
}; | |
google::protobuf::io::ConcatenatingInputStream cis(inputStreams, 2); | |
google::protobuf::io::CodedInputStream is(&cis); | |
// read the varint again (dummy operation for variable seek) | |
uint32_t temp = 0; | |
is.ReadVarint32(&temp); | |
// read in the payload | |
std::string payload; | |
payload.reserve(messageLength_); | |
is.ReadString(&payload, messageLength_); | |
receive(payload); | |
waitForHeader(); // keep listening | |
// TODO: timeout mechanism should be enabled/disabled based on | |
// concrete connection type. not all connections may use it. | |
setTimer(GENERAL_TIMEOUT); // TODO: seems weird to set a timer when waiting indefinitely for next message | |
} | |
template <class T> void ProtobufConnection<T>::timeout(const boost::system::error_code &ec) { | |
// TODO: what to do here to cleanup and unregister from battles, etc? | |
// TODO: make sure timers don't fire when we're in the middle of processing messages | |
if (ec != boost::asio::error::operation_aborted) { | |
VLOG(1) << "ProtobufConnection<T>::timeout: error closing connection"; | |
this->socket_.close(); | |
} | |
} | |
template <class T> void ProtobufConnection<T>::setTimer(uint64_t duration) { | |
if (!timeoutsEnabled()) { | |
return; | |
} | |
if (duration) { | |
timer_.expires_from_now(boost::posix_time::milliseconds(duration)); // this will cancel all pending | |
timer_.async_wait(boost::bind( | |
&ProtobufConnection<T>::timeout, | |
this, boost::asio::placeholders::error | |
)); | |
} | |
else { | |
timer_.cancel(); | |
} | |
} | |
#endif // ENYO_PROTOBUF_CONNECTION_HPP |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment