Skip to content

Instantly share code, notes, and snippets.

@Gerold103
Last active February 1, 2024 23:17
Show Gist options
  • Save Gerold103/9f4ba2baad1ea59cf8a2dbbb8a82df1e to your computer and use it in GitHub Desktop.
Save Gerold103/9f4ba2baad1ea59cf8a2dbbb8a82df1e to your computer and use it in GitHub Desktop.
BenchIOMultiplexing
#include <atomic>
#include <cassert>
#include <cstring>
#include <ctime>
#include <fcntl.h>
#include <functional>
#include <iostream>
#include <netinet/in.h>
#include <poll.h>
#include <sstream>
#include <sys/epoll.h>
#include <sys/eventfd.h>
#include <sys/select.h>
#include <sys/socket.h>
#include <thread>
#include <unistd.h>
#define USE_EPOLL 0
#define USE_POLL 0
#define USE_SELECT 0
#define USE_NONE 1
#if USE_EPOLL + USE_POLL + USE_SELECT + USE_NONE != 1
#error "Must choose one multiplexing method"
#endif
#define MAYBE_UNUSED(...) ((void)sizeof(1, ##__VA_ARGS__))
#define LOG_IMPL(...) \
do \
{ \
std::stringstream ss; \
ss << __VA_ARGS__ << std::endl; \
std::cerr << ss.str(); \
} while (false)
#define LOG_NOP(...) MAYBE_UNUSED(std::cerr << __VA_ARGS__)
// Enable the logging using LOG_IMPL if needed.
#define LOG_DEBUG LOG_NOP
#define LOG_THIS_DEBUG(name, method, ...) LOG_NOP(#name "(" << this << ")::" #method << ": " << __VA_ARGS__)
class IOCore;
static constexpr uint64_t theRequestTargetCount = 1000;
static constexpr int theClientCountPerWave = 1000;
static constexpr int theWaveCount = 5;
static constexpr int theTotalClientCount = theClientCountPerWave * theWaveCount;
#if USE_EPOLL
static constexpr int theEpollBatchSize = 128;
#endif
static constexpr int theYieldPeriod = 1;
#if USE_NONE
static constexpr int theYieldSleepMs = 0;
#endif
static std::atomic_uint64_t theStatWouldBlockReadCount{0};
static std::atomic_uint64_t theStatWouldBlockWriteCount{0};
enum IOEventBit
{
IO_EVENT_READ = 1,
IO_EVENT_WRITE = 2,
};
enum IOSubscriptionState
{
IO_SUB_STATE_NEW,
IO_SUB_STATE_WORKING,
IO_SUB_STATE_DELETING,
};
static uint64_t
getUsec();
static void
makeFdNonblock(
int fd);
static void
ioCoreRunF(IOCore &core);
//////////////////////////////////////////////////////////////////////////////////////////
using OnIOEventCb = std::function<void(int)>;
struct IOSubscription
{
IOSubscription(
IOCore &core,
int fd,
const OnIOEventCb& cb)
: myState(IO_SUB_STATE_NEW)
, myCb(cb)
, myFd(fd)
, myIdx(-1)
, myMask(0)
, myCore(core)
{}
~IOSubscription()
{
assert(myState == IO_SUB_STATE_DELETING);
assert(myFd >= 0);
int rc = ::close(myFd);
assert(rc == 0);
}
void
close();
void
update(int mask);
IOSubscriptionState myState;
const OnIOEventCb myCb;
const int myFd;
int myIdx;
int myMask;
IOCore &myCore;
};
//////////////////////////////////////////////////////////////////////////////////////////
struct IOCore
{
IOCore();
~IOCore();
void
wakeup();
void
stop() { myIsStopped.store(true, std::memory_order_relaxed); wakeup(); }
bool
isStopped() const { return myIsStopped.load(std::memory_order_relaxed); }
IOSubscription *
subscribe(
int fd,
const OnIOEventCb& cb);
void
unsubscribe(
IOSubscription *s);
void
update(
IOSubscription *s,
int mask);
void
roll();
void
yield();
void
processQueues();
#if USE_EPOLL || USE_POLL || USE_SELECT
int myEventFd;
IOSubscription *myEventSub;
#endif
#if USE_EPOLL
int myFd;
#elif USE_POLL
std::vector<pollfd> myPoll;
#endif
std::atomic_bool myIsStopped;
std::vector<IOSubscription *> mySubs;
std::vector<IOSubscription *> myQueue;
std::atomic_uint64_t mySize;
size_t myMaxSize;
};
//////////////////////////////////////////////////////////////////////////////////////////
struct Client
{
Client();
~Client();
void
onIOEvent(
int mask);
void
connect(
IOCore &core,
uint16_t port);
void
wrap(
IOCore &core,
int sock);
bool
isFinished() const { return myIsFinished.load(std::memory_order_relaxed); }
IOSubscription *mySub;
uint64_t myRecvCount;
uint64_t mySendCount;
std::atomic_bool myIsFinished;
};
//////////////////////////////////////////////////////////////////////////////////////////
struct Server
{
Server();
~Server();
uint16_t
bindAndListen(
IOCore &core);
void
onIOEvent(
int mask);
IOSubscription *mySub;
std::vector<Client *> myClients;
};
//////////////////////////////////////////////////////////////////////////////////////////
struct ClientWave
{
~ClientWave();
bool
areAllFinished() const;
IOCore myCore;
std::vector<Client *> myClients;
};
//////////////////////////////////////////////////////////////////////////////////////////
int main()
{
IOCore serverCore;
std::cout << "start server" << std::endl;
Server server;
uint16_t port = server.bindAndListen(serverCore);
std::thread serverThread(ioCoreRunF, std::ref(serverCore));
std::cout << "start clients" << std::endl;
std::vector<ClientWave *> clientWaves;
clientWaves.resize(theWaveCount);
for (ClientWave *&w : clientWaves)
w = new ClientWave();
for (int i = 0; i < theClientCountPerWave; ++i)
{
for (ClientWave *w : clientWaves)
{
Client *c = new Client();
w->myClients.push_back(c);
c->connect(w->myCore, port);
}
}
std::cout << "wait for the load to pass" << std::endl;
uint64_t t1 = getUsec();
for (int i = 0; i < theWaveCount; ++i)
{
ClientWave &w = *clientWaves[i];
uint64_t tw1 = getUsec();
std::cout << "Wave " << i + 1 << std::endl;
std::thread worker(ioCoreRunF, std::ref(w.myCore));
while (!w.areAllFinished())
usleep(1000);
w.myCore.stop();
worker.join();
uint64_t tw2 = getUsec();
std::cout << "Took " << (tw2 - tw1) / 1000.0 << " ms" << std::endl;
}
uint64_t t2 = getUsec();
std::cout << "wait for the server to stop" << std::endl;
serverCore.stop();
serverThread.join();
std::cout << (t2 - t1) / 1000.0 << " ms" << std::endl;
std::cout << "Max server core size: " << serverCore.myMaxSize << std::endl;
for (int i = 0; i < theWaveCount; ++i)
{
std::cout << "Max wave " << i + 1 << " core size: "
<< clientWaves[i]->myCore.myMaxSize << std::endl;
}
std::cout << "Would block reads: " << theStatWouldBlockReadCount << std::endl;
std::cout << "Would block writes: " << theStatWouldBlockWriteCount << std::endl;
for (ClientWave *w : clientWaves)
{
delete w;
}
return 0;
}
//////////////////////////////////////////////////////////////////////////////////////////
static uint64_t
getUsec()
{
timespec t;
clock_gettime(CLOCK_MONOTONIC, &t);
return t.tv_sec * 1'000'000 + t.tv_nsec / 1000;
}
static void
makeFdNonblock(
int fd)
{
int rc = fcntl(fd, F_GETFL, 0);
assert(rc >= 0);
rc = fcntl(fd, F_SETFL, rc | O_NONBLOCK);
assert(rc == 0);
}
static void
ioCoreRunF(IOCore &core)
{
uint32_t yield = 0;
while (!core.isStopped()) {
core.roll();
if (++yield % theYieldPeriod == 0)
core.yield();
}
}
//////////////////////////////////////////////////////////////////////////////////////////
inline void
IOSubscription::close()
{
myCore.unsubscribe(this);
}
inline void
IOSubscription::update(int mask)
{
myCore.update(this, mask);
}
//////////////////////////////////////////////////////////////////////////////////////////
inline
IOCore::IOCore()
#if USE_EPOLL
: myFd(epoll_create1(0))
#endif
{
myIsStopped = false;
myMaxSize = 0;
#if USE_EPOLL || USE_POLL || USE_SELECT
myEventFd = eventfd(0, EFD_NONBLOCK);
myEventSub = subscribe(myEventFd, [](int) {});
myEventSub->update(IO_EVENT_READ);
#endif
}
IOCore::~IOCore()
{
#if USE_EPOLL || USE_POLL || USE_SELECT
unsubscribe(myEventSub);
myEventSub = nullptr;
myEventFd = -1;
#endif
int rc;
processQueues();
assert(mySubs.empty());
assert(myQueue.empty());
#if USE_EPOLL
assert(myFd >= 0);
rc = close(myFd);
assert(rc == 0);
#elif USE_POLL
assert(myPoll.empty());
#endif
}
void IOCore::wakeup()
{
#if USE_EPOLL || USE_POLL || USE_SELECT
uint64_t val = 1;
ssize_t rc = write(myEventFd, &val, sizeof(val));
assert(rc == sizeof(val));
#endif
}
IOSubscription *
IOCore::subscribe(
int fd,
const OnIOEventCb& cb)
{
for (IOSubscription *s : mySubs)
assert(s->myFd != fd);
for (IOSubscription *s : myQueue)
assert(s->myFd != fd);
IOSubscription *s = new IOSubscription(*this, fd, cb);
myQueue.push_back(s);
mySize.fetch_add(1, std::memory_order_relaxed);
return s;
}
void
IOCore::unsubscribe(
IOSubscription *s)
{
for (IOSubscription *is : myQueue)
assert(is != s);
assert(mySubs[s->myIdx] == s);
assert(s->myState == IO_SUB_STATE_WORKING);
s->myState = IO_SUB_STATE_DELETING;
myQueue.push_back(s);
mySize.fetch_add(1, std::memory_order_relaxed);
}
void
IOCore::update(
IOSubscription *s,
int mask)
{
assert(s->myState != IO_SUB_STATE_WORKING || mySubs[s->myIdx] == s);
#if USE_POLL
int pollMask = 0;
if ((mask & IO_EVENT_READ) != 0)
pollMask |= POLLIN;
if ((mask & IO_EVENT_WRITE) != 0)
pollMask |= POLLOUT;
assert(pollMask != 0);
if (s->myState == IO_SUB_STATE_WORKING)
myPoll[s->myIdx].events = pollMask;
#endif
s->myMask = mask;
}
void
IOCore::roll()
{
processQueues();
if (mySubs.empty())
return;
#if USE_EPOLL
epoll_event evs[theEpollBatchSize];
int rc = epoll_wait(myFd, evs, theEpollBatchSize, -1);
if (rc < 0 && errno == EINTR)
return;
assert(rc >= 0);
LOG_THIS_DEBUG(IOCore, roll, rc << " events");
for (int i = 0; i < rc; ++i)
{
epoll_event& ev = evs[i];
IOSubscription *s = (IOSubscription *)ev.data.ptr;
int mask = 0;
if ((ev.events & EPOLLIN) != 0)
mask |= IO_EVENT_READ;
if ((ev.events & EPOLLOUT) != 0)
mask |= IO_EVENT_WRITE;
assert((ev.events & ~(EPOLLIN | EPOLLOUT)) == 0);
assert(mask != 0);
s->myCb(mask);
}
#elif USE_POLL
int rc = poll(myPoll.data(), myPoll.size(), -1);
if (rc < 0 && errno == EINTR)
return;
assert(rc >= 0);
LOG_THIS_DEBUG(IOCore, roll, rc << " events");
for (size_t i = 0; i < mySubs.size(); ++i)
{
pollfd& ev = myPoll[i];
if (ev.revents == 0)
continue;
assert(rc > 0);
--rc;
IOSubscription *s = mySubs[i];
int mask = 0;
if ((ev.revents & POLLIN) != 0)
mask |= IO_EVENT_READ;
if ((ev.revents & POLLOUT) != 0)
mask |= IO_EVENT_WRITE;
assert((ev.revents & ~(POLLIN | POLLOUT)) == 0);
assert(mask != 0);
s->myCb(mask);
if (rc == 0)
break;
}
#elif USE_SELECT
fd_set readSet;
FD_ZERO(&readSet);
fd_set writeSet;
FD_ZERO(&writeSet);
int maxFd = -1;
for (IOSubscription *s : mySubs)
{
assert(s->myMask != 0);
if ((s->myMask & IO_EVENT_READ) != 0)
FD_SET(s->myFd, &readSet);
if ((s->myMask & IO_EVENT_WRITE) != 0)
FD_SET(s->myFd, &writeSet);
if (s->myFd > maxFd)
maxFd = s->myFd;
}
int rc = select(maxFd + 1, &readSet, &writeSet, nullptr, nullptr);
if (rc < 0 && errno == EINTR)
return;
assert(rc >= 0);
LOG_THIS_DEBUG(IOCore, roll, rc << " events");
for (IOSubscription *s : mySubs)
{
int mask = 0;
if ((s->myMask & IO_EVENT_READ) != 0 && FD_ISSET(s->myFd, &readSet))
{
assert(rc > 0);
--rc;
mask |= IO_EVENT_READ;
}
if ((s->myMask & IO_EVENT_WRITE) != 0 && FD_ISSET(s->myFd, &writeSet))
{
assert(rc > 0);
--rc;
mask |= IO_EVENT_WRITE;
}
if (mask == 0)
continue;
s->myCb(mask);
if (rc == 0)
break;
}
#elif USE_NONE
LOG_THIS_DEBUG(IOCore, roll, "");
for (IOSubscription *s : mySubs)
{
int mask = 0;
if ((s->myMask & IO_EVENT_READ) != 0)
mask |= IO_EVENT_READ;
if ((s->myMask & IO_EVENT_WRITE) != 0)
mask |= IO_EVENT_WRITE;
if (mask != 0)
s->myCb(mask);
}
#endif
}
void
IOCore::yield()
{
#if USE_NONE
if (theYieldSleepMs > 0)
usleep(1000 * theYieldSleepMs);
#endif
}
void
IOCore::processQueues()
{
if (myQueue.empty())
return;
for (IOSubscription *s : myQueue)
{
if (s->myState == IO_SUB_STATE_NEW)
{
LOG_THIS_DEBUG(IOCore, processQueues, "add " << s);
s->myState = IO_SUB_STATE_WORKING;
s->myMask = IO_EVENT_READ | IO_EVENT_WRITE;
s->myIdx = mySubs.size();
#if USE_EPOLL
epoll_event ev;
memset(&ev, 0, sizeof(ev));
ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
ev.data.ptr = (void *)s;
int rc = epoll_ctl(myFd, EPOLL_CTL_ADD, s->myFd, &ev);
assert(rc == 0);
#elif USE_POLL
myPoll.emplace_back();
pollfd& p = myPoll.back();
p.fd = s->myFd;
p.events = POLLIN | POLLOUT;
#endif
mySubs.push_back(s);
}
else if (s->myState == IO_SUB_STATE_DELETING)
{
assert(mySubs.size() > s->myIdx);
assert(mySubs[s->myIdx] == s);
assert(s->myFd >= 0);
LOG_THIS_DEBUG(IOCore, processQueues, "drop " << s);
mySubs.back()->myIdx = s->myIdx;
mySubs[s->myIdx] = mySubs.back();
#if USE_EPOLL
int rc = epoll_ctl(myFd, EPOLL_CTL_DEL, s->myFd, nullptr);
assert(rc == 0);
#elif USE_POLL
myPoll[s->myIdx] = myPoll.back();
myPoll.resize(myPoll.size() - 1);
#endif
delete s;
mySubs.resize(mySubs.size() - 1);
}
else
{
assert(false);
}
}
if (mySubs.size() > myMaxSize)
myMaxSize = mySubs.size();
myQueue.clear();
mySize.store(mySubs.size(), std::memory_order_relaxed);
}
//////////////////////////////////////////////////////////////////////////////////////////
Client::Client()
: mySub(nullptr)
, myRecvCount(0)
, mySendCount(0)
, myIsFinished(false)
{}
Client::~Client()
{
LOG_THIS_DEBUG(Client, ~Client, "drop " << mySub);
mySub->close();
}
void
Client::onIOEvent(
int mask)
{
LOG_THIS_DEBUG(Client, onIOEvent, "mask " << mask);
uint8_t data;
ssize_t rc;
if ((mask & IO_EVENT_READ) != 0)
{
LOG_THIS_DEBUG(Client, onIOEvent, "need to read");
rc = recv(mySub->myFd, &data, 1, 0);
if (rc < 0)
{
LOG_THIS_DEBUG(Client, onIOEvent, "read wouldblock");
assert(errno == EWOULDBLOCK);
theStatWouldBlockReadCount.fetch_add(1, std::memory_order_relaxed);
}
else
{
assert(rc == 1);
++myRecvCount;
LOG_THIS_DEBUG(Client, onIOEvent, "read ok, count = " << myRecvCount);
if (myRecvCount >= theRequestTargetCount)
{
LOG_THIS_DEBUG(Client, onIOEvent, "is finished");
assert(!isFinished());
myIsFinished.store(true, std::memory_order_relaxed);
}
}
}
if ((mask & IO_EVENT_WRITE) != 0 && myRecvCount >= mySendCount &&
mySendCount < theRequestTargetCount)
{
rc = send(mySub->myFd, &data, 1, 0);
if (rc < 0)
{
LOG_THIS_DEBUG(Client, onIOEvent, "send wouldblock");
assert(errno == EWOULDBLOCK);
theStatWouldBlockWriteCount.fetch_add(1, std::memory_order_relaxed);
}
else
{
LOG_THIS_DEBUG(Client, onIOEvent, "send ok");
assert(rc == 1);
++mySendCount;
}
}
if (myRecvCount >= mySendCount && myRecvCount < theRequestTargetCount)
mySub->update(IO_EVENT_READ | IO_EVENT_WRITE);
else
mySub->update(IO_EVENT_READ);
}
void
Client::connect(
IOCore &core,
uint16_t port)
{
LOG_THIS_DEBUG(Client, connect, "");
sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
int sock = socket(AF_INET, SOCK_STREAM, 0);
assert(sock >= 0);
int rc = ::connect(sock, (sockaddr *)&addr, sizeof(addr));
assert(rc >= 0);
wrap(core, sock);
}
void
Client::wrap(
IOCore &core,
int sock)
{
assert(mySub == nullptr);
makeFdNonblock(sock);
mySub = core.subscribe(sock,
std::bind(&Client::onIOEvent, this, std::placeholders::_1));
LOG_THIS_DEBUG(Client, wrap, mySub);
}
//////////////////////////////////////////////////////////////////////////////////////////
inline
Server::Server() : mySub(nullptr) {}
inline
Server::~Server()
{
mySub->close();
for (Client *c : myClients)
delete c;
}
uint16_t
Server::bindAndListen(
IOCore &core)
{
sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = htonl(INADDR_ANY);
socklen_t len = sizeof(addr);
int sock = socket(AF_INET, SOCK_STREAM, 0);
assert(sock >= 0);
int value = 1;
int rc = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &value, sizeof(value));
assert(rc == 0);
rc = bind(sock, (sockaddr *)&addr, len);
assert(rc == 0);
rc = listen(sock, SOMAXCONN);
assert(rc == 0);
makeFdNonblock(sock);
mySub = core.subscribe(sock,
std::bind(&Server::onIOEvent, this, std::placeholders::_1));
LOG_THIS_DEBUG(Server, bindAndListen, mySub);
rc = getsockname(sock, (sockaddr *)&addr, &len);
assert(rc == 0);
assert(addr.sin_family == AF_INET);
return ntohs(addr.sin_port);
}
void
Server::onIOEvent(
int mask)
{
if ((mask & IO_EVENT_READ) == 0)
return;
sockaddr_in addr;
while (true)
{
socklen_t len = sizeof(addr);
int sock = accept(mySub->myFd, (sockaddr *)&addr, &len);
if (sock < 0)
{
assert(errno == EWOULDBLOCK);
return;
}
LOG_THIS_DEBUG(Server, onIOEvent, "new client");
Client *c = new Client();
myClients.push_back(c);
c->wrap(mySub->myCore, sock);
}
}
//////////////////////////////////////////////////////////////////////////////////////////
ClientWave::~ClientWave()
{
for (Client *c : myClients)
delete c;
}
bool
ClientWave::areAllFinished() const
{
for (const Client *c : myClients)
{
if (!c->isFinished())
return false;
}
return true;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment