Skip to content

Instantly share code, notes, and snippets.

@semmel
Last active July 6, 2020 20:23
Show Gist options
  • Save semmel/32a66cb9091a7f6749a3b02daae8e712 to your computer and use it in GitHub Desktop.
Save semmel/32a66cb9091a7f6749a3b02daae8e712 to your computer and use it in GitHub Desktop.
Synchronize asynchronous writes to a boost.asio socket.
/* Just my current implementation of Sam Miller's answer on Stack Overflow
* @see https://stackoverflow.com/questions/7754695/boost-asio-async-write-how-to-not-interleaving-async-write-calls
*/
/*
* File: NonInterleavingAsyncSocketWrite.hpp
* Author: Matthias Seemann <seemann@visisoft.de>
* @see https://stackoverflow.com/questions/7754695/boost-asio-async-write-how-to-not-interleaving-async-write-calls
*
* When using boost::asio::async_write:
* The program must ensure that the stream performs no other write operations (such as async_write,
* the stream's async_write_some function, or any other composed operations that perform writes) until this operation completes.
* see the code for a send queue: https://stackoverflow.com/questions/7754695/boost-asio-async-write-how-to-not-interleaving-async-write-calls
* see also https://stackoverflow.com/questions/12794107/why-do-i-need-strand-per-connection-when-using-boostasio/12801042#12801042
*
* Created on September 7, 2018, 5:46 PM
*/
#ifndef NONINTERLEAVINGASYNCSOCKETWRITE_HPP
#define NONINTERLEAVINGASYNCSOCKETWRITE_HPP
#include <boost/asio.hpp>
#include <deque>
#include <iostream>
#include <string>
#include <memory>
template <class SocketClass>
class NonInterleavingAsyncSocketWrite {
typedef std::deque<std::string> Outbox;
std::unique_ptr<SocketClass>& socketPtr;
boost::asio::io_context& ioContext;
boost::asio::io_context::strand strand;
Outbox outbox;
public:
NonInterleavingAsyncSocketWrite(boost::asio::io_context& ioContext_, std::unique_ptr<SocketClass>& socketPtr_) :
socketPtr(socketPtr_),
ioContext(ioContext_),
strand(ioContext_)
{}
void write(const std::string& message) {
boost::asio::post(strand,
[this, message]() { writeImpl(message); }
);
}
private:
void writeImpl(const std::string& message) {
outbox.push_back(message);
if (outbox.size() > 1) {
// outstanding async_write
return;
}
this->write();
}
void write() {
const std::string message = outbox.front();
if (!socketPtr || !socketPtr->is_open()) {
std::cerr << "Could not write to a socket which is not ready! Payload:'" << message << "'" << std::endl;
outbox.pop_front();
return;
}
boost::asio::async_write(
*socketPtr,
boost::asio::buffer(message.c_str(), message.size()),
boost::asio::bind_executor(
strand,
[this](const boost::system::error_code &ec, std::size_t bytes_transferred) {
outbox.pop_front();
if (ec) {
std::cerr << "could not write: " << boost::system::system_error(ec).what() << std::endl;
return;
}
if (!outbox.empty()) {
// more messages to send
this->write();
}
}
)
);
}
};
#endif /* NONINTERLEAVINGASYNCSOCKETWRITE_HPP */
@hytano
Copy link

hytano commented Jul 6, 2020

@amasciotta

Great code! What does the // outstanding async_write comment on line 53 stands for?

The continuation of the async_write will call write() in line 84 again if the outbox is NOT empty so there is no need to kick off another async_write. This does the trick preventing interleaving writes.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment