Skip to content

Instantly share code, notes, and snippets.

@munziralyafie
Created April 28, 2025 02:01
Show Gist options
  • Save munziralyafie/bd6d7b72bb41a89cd142a128689c6bd4 to your computer and use it in GitHub Desktop.
Save munziralyafie/bd6d7b72bb41a89cd142a128689c6bd4 to your computer and use it in GitHub Desktop.
Benchmark Server Implementation
#include <iostream>
#include <fstream>
#include <any>
#include <yaml-cpp/yaml.h>
#include <chrono>
#include <sys/resource.h>
#include <sys/time.h>
#include <sstream>
#include <vector>
#include <thread>
#include "utils/logger.hpp"
#include "utils/channel_descriptor_util.hpp"
#include "tuples/tuple_buffer.hpp"
#include "tuples/schema.hpp"
#include "kayon.pb.h"
#include "transport/rdma_channel.hpp"
#include "transport/iouring_channel.hpp"
#include "transport/zmq_channel.hpp"
#include "operators/gpu_wakeup_kernel.cuh"
#include "cuda_buffer_allocator.hpp"
#include "cpu_buffer_allocator.hpp"
uint64_t total_bytes_processed = 0;
uint64_t total_packets = 0;
uint64_t total_received_tuple = 0;
std::deque<std::chrono::high_resolution_clock::time_point> timestamp_nic_queue;
std::chrono::high_resolution_clock::time_point timestamp_nic;
std::chrono::high_resolution_clock::time_point timestamp_gpu;
double total_host_latency = 0.0;
double total_gpu_latency = 0.0;
double total_full_latency = 0.0;
double total_nicGpu_latency = 0.0;
int latency_count = 0;
double cpu_usage = 0.0;
std::atomic<bool> stopMonitoring(false);
std::vector<double> cpuUsageData;
struct CpuTimes {
double user, nice, system, idle, iowait, irq, softirq, steal;
double total;
};
double getCPUTime() {
struct rusage usage; // store the information of resource usage
getrusage(RUSAGE_SELF, &usage); // get the current CPU Usage
return (usage.ru_utime.tv_sec + usage.ru_utime.tv_usec / 1e6) + // CPU time in user space
(usage.ru_stime.tv_sec + usage.ru_stime.tv_usec / 1e6); // CPU time in kernel space
}
CpuTimes getCpuUsage() {
std::ifstream file("/proc/stat");
std::string line, cpu;
CpuTimes times{};
if (file.is_open() && std::getline(file, line)) {
std::istringstream iss(line);
iss >> cpu >> times.user >> times.nice >> times.system >> times.idle
>> times.iowait >> times.irq >> times.softirq >> times.steal;
times.total = times.user + times.nice + times.system + times.idle +
times.iowait + times.irq + times.softirq + times.steal;
}
return times;
}
struct CpuUsageResult {
double total;
double user;
double system;
double idle;
double iowait;
double irq;
double softirq;
};
CpuUsageResult calculateCpuUsage(const CpuTimes& start, const CpuTimes& end) {
double total_diff = end.total - start.total;
if (total_diff == 0) return {0, 0, 0, 0, 0, 0, 0};
CpuUsageResult result;
result.total = ((end.user + end.system + end.irq + end.softirq) -
(start.user + start.system + start.irq + start.softirq)) / total_diff * 100.0;
result.user = (end.user - start.user) / total_diff * 100.0;
result.system = (end.system - start.system) / total_diff * 100.0;
result.idle = (end.idle - start.idle) / total_diff * 100.0;
result.iowait = (end.iowait - start.iowait) / total_diff * 100.0;
result.irq = (end.irq - start.irq) / total_diff * 100.0;
result.softirq = (end.softirq - start.softirq) / total_diff * 100.0;
return result;
}
void monitorCPUUsage(int durationBefore, int session, int durationAfter) {
int totalDuration = durationBefore + session + durationAfter;
std::ofstream file("16rdmaSendCpu_usage_results.csv");
file << "Time,CPU_Usage(%),User(%),System(%),Idle(%),IOWait(%),IRQ(%),SoftIRQ(%)\n";
for (int i = 0; i < totalDuration; ++i) {
if (stopMonitoring.load()) break;
CpuTimes start = getCpuUsage();
std::this_thread::sleep_for(std::chrono::seconds(1));
if (stopMonitoring.load()) break;
CpuTimes end = getCpuUsage();
CpuUsageResult usage = calculateCpuUsage(start, end);
LOG_INFO("[CPU MONITOR] Time %d sec | Total: %.2f%% | User: %.2f%% | System: %.2f%% | Idle: %.2f%% | IOWait: %.2f%% | IRQ: %.2f%% | SoftIRQ: %.2f%%",
i - durationBefore, usage.total, usage.user, usage.system, usage.idle, usage.iowait, usage.irq, usage.softirq);
file << (i - durationBefore) << "," << usage.total << "," << usage.user << ","
<< usage.system << "," << usage.idle << "," << usage.iowait << ","
<< usage.irq << "," << usage.softirq << "\n";
}
file.close();
}
auto createSchema() {
auto schema = std::make_shared<kayon::core::Schema>();
schema->addField("id", kayon::core::FieldType::INT32);
schema->addField("name", kayon::core::FieldType::STRING64);
schema->addField("score", kayon::core::FieldType::FLOAT32);
return schema;
}
void logTupleData(kayon::core::TupleBuffer& tupleBuffer, size_t numTuples, const std::string& message, size_t packetNumber) {
for (size_t i = 0; i < numTuples; ++i) {
int32_t id;
char name[64] = {0};
float score;
try {
tupleBuffer.getField(i, "id", &id);
tupleBuffer.getField(i, "name", name);
tupleBuffer.getField(i, "score", &score);
} catch (const std::exception& e) {
LOG_ERROR("Error extracting tuple fields: %s", e.what());
continue;
}
LOG_DEBUG("%s Packet %zu-Tuple %zu: ID=%d, Name=%s, Score=%.2f",
message.c_str(), packetNumber, i + 1, id, name, score);
}
}
void saveThroughput(const std::string& filename, const std::string& channel, size_t packet_size, double throughput,
uint64_t pps, uint64_t total_data, uint64_t total_packets, uint64_t total_tuples) {
std::ofstream file;
file.open(filename, std::ios::app);
file << channel << ","
<< packet_size << ","
<< throughput << ","
<< pps << ","
<< total_data << ","
<< total_packets << ","
<< total_tuples << "\n";
file.close();
}
void savePPS(const std::string& filename, const std::string& channel, size_t packet_size, uint64_t pps) {
std::ofstream file;
file.open(filename, std::ios::app);
file << channel << ","
<< packet_size << ","
<< pps << "\n";
file.close();
}
void saveAverageLatency(const std::string& filename, const std::string& channel, size_t packet_size, double host, double gpu, double full, double nicGpu) {
std::ofstream file;
file.open(filename, std::ios::app);
file << channel << ","
<< packet_size << ","
<< host << ","
<< gpu << ","
<< full << ","
<< nicGpu << "\n";
file.close();
}
void saveLatency(const std::string& filename, const std::string& channel, size_t packet_size, double host, double gpu, double full, double nicGpu) {
std::ofstream file;
file.open(filename, std::ios::app);
file << channel << ","
<< packet_size << ","
<< host << ","
<< gpu << ","
<< full << ","
<< nicGpu << "\n";
file.close();
}
void runThroughputBenchmark(int channel, size_t buffer_size, int accelerator, int batch, int session, int duration,
const std::string& param_name, const std::string& address) {
LOG_DEBUG("Running Throughput Benchmark...");
size_t data_size = 0;
auto schema = createSchema();
size_t batch_size = schema->getSchemaSize() * batch;
kayon::core::TupleBuffer tupleBuffer(batch_size, schema);
kayon::core::ChannelDescriptor channel_descriptor;
channel_descriptor.set_type(static_cast<kayon::core::ChannelType>(channel));
kayon::core::ChannelDescriptorUtil::addParameter(channel_descriptor, "receiver", 1);
kayon::core::ChannelDescriptorUtil::addParameter(channel_descriptor, param_name, address);
std::shared_ptr<kayon::core::Channel> channel_ptr;
switch (channel) {
case 0: // ZMQ
channel_ptr = std::make_shared<kayon::zmq::ZMQChannel>(channel_descriptor);
break;
case 1: // IOUring
channel_ptr = std::make_shared<kayon::io_uring::IOUringChannel>(channel_descriptor);
break;
case 2: // RDMA
kayon::core::ChannelDescriptorUtil::addParameter(channel_descriptor, "rdma_device", std::string("mlx5_0"));
channel_ptr = std::make_shared<kayon::rdma::RDMAChannel>(channel_descriptor);
break;
default:
LOG_ERROR_AND_THROW("Invalid channel value for throughput benchmark: %d", channel);
}
channel_ptr->setCallback([&tupleBuffer, &schema, &data_size](void* buffer, size_t size) {
total_packets++;
size_t tupleSize = schema->getSchemaSize();
if (size % tupleSize != 0) {
LOG_ERROR("Received data size is not a multiple of tuple size! Expected multiple of %zu, got %zu", tupleSize, size);
return;
}
size_t numTuples = size / tupleSize;
total_received_tuple += numTuples;
kayon::cuda::launchGpuProcessB(static_cast<char*>(buffer), size);
//LOG_DEBUG("Paket: %d", total_packets);
total_bytes_processed += size;
data_size = size;
});
std::thread receiving_thread = std::thread([&channel_ptr, accelerator, buffer_size, session] {
void* buffer = nullptr;
if (accelerator == 1) { // CUDA
buffer = kayon::cuda::CUDABufferAllocator::allocatePinnedMemory(buffer_size);
} else { // CPU
buffer = kayon::cpu::CPUBufferAllocator::allocatePinnedMemory(buffer_size);
}
channel_ptr->open(buffer, buffer_size);
std::this_thread::sleep_for(std::chrono::seconds(session));
channel_ptr->close();
if (accelerator == 1) {
kayon::cuda::CUDABufferAllocator::freePinnedMemory(buffer);
} else {
kayon::cpu::CPUBufferAllocator::freePinnedMemory(buffer, buffer_size);
}
});
receiving_thread.join();
double throughput = total_bytes_processed / (1024.0 * 1024.0 * duration);
uint64_t pps = total_packets / duration;
LOG_INFO("Throughput: %.3f MBps | PPS: %d | Duration: %d s | Total Data: %llu bytes | Total Packets: %llu | Total Tuples: %llu",
throughput, pps, duration, total_bytes_processed, total_packets, total_received_tuple);
/*saveThroughput("throughputIOUring.csv", (channel == 0 ? "ZMQ" : channel == 1 ? "IOUring" : "RDMAWrite"), data_size, throughput,
pps, total_bytes_processed, total_packets, total_received_tuple);*/
savePPS("PPSnew512tuple.csv", (channel == 0 ? "ZMQ" : channel == 1 ? "IOUring" : "RDMASend"), data_size, pps);
LOG_DEBUG("Throughput Benchmark Completed.");
}
void runLatencyBenchmark(int channel, size_t buffer_size, int accelerator,int batch, int session, int duration,
const std::string& param_name, const std::string& address) {
LOG_DEBUG("Running Latency Benchmark...");
size_t data_size = 0;
if (accelerator == 1) {
kayon::cuda::initializeCudaEvents();
}
auto schema = createSchema();
size_t batch_size = schema->getSchemaSize() * batch;
kayon::core::TupleBuffer tupleBuffer(batch_size, schema);
kayon::core::ChannelDescriptor channel_descriptor;
channel_descriptor.set_type(static_cast<kayon::core::ChannelType>(channel));
kayon::core::ChannelDescriptorUtil::addParameter(channel_descriptor, "receiver", 1);
kayon::core::ChannelDescriptorUtil::addParameter(channel_descriptor, param_name, address);
std::shared_ptr<kayon::core::Channel> channel_ptr;
switch (channel) {
case 0: // ZMQ
channel_ptr = std::make_shared<kayon::zmq::ZMQChannel>(channel_descriptor);
if (auto zmqChannel = std::dynamic_pointer_cast<kayon::zmq::ZMQChannel>(channel_ptr)) {
zmqChannel->setLatencyCallback([&](std::chrono::high_resolution_clock::time_point ts_nic) {
timestamp_nic = ts_nic;
});
}
break;
case 1: // IOUring
channel_ptr = std::make_shared<kayon::io_uring::IOUringChannel>(channel_descriptor);
if (auto iouringChannel = std::dynamic_pointer_cast<kayon::io_uring::IOUringChannel>(channel_ptr)) {
iouringChannel->setLatencyCallback([&](std::chrono::high_resolution_clock::time_point ts_nic) {
timestamp_nic = ts_nic;
});
}
break;
case 2: // RDMA
kayon::core::ChannelDescriptorUtil::addParameter(channel_descriptor, "rdma_device", std::string("mlx5_0"));
channel_ptr = std::make_shared<kayon::rdma::RDMAChannel>(channel_descriptor);
if (auto rdmaChannel = std::dynamic_pointer_cast<kayon::rdma::RDMAChannel>(channel_ptr)) {
rdmaChannel->setLatencyCallback([&](std::chrono::high_resolution_clock::time_point ts_nic) {
timestamp_nic_queue.push_back(ts_nic);
});
}
break;
default:
LOG_ERROR_AND_THROW("Invalid channel value for latency benchmark: %d", channel);
}
channel_ptr->setCallback([&tupleBuffer, &schema, channel, &data_size](void* buffer, size_t size) {
auto timestamp_host = std::chrono::high_resolution_clock::now();
size_t tupleSize = schema->getSchemaSize();
latency_count++;
if (size % tupleSize != 0) {
LOG_ERROR("Received data size is not a multiple of tuple size! Expected multiple of %zu, got %zu", tupleSize, size);
return;
}
size_t numTuples = size / tupleSize;
total_received_tuple += numTuples;
float gpu_latency = kayon::cuda::launchGpuProcessB(static_cast<char*>(buffer), size);
auto timestamp_gpu = std::chrono::high_resolution_clock::now();
if (latency_count == 1) {
//LOG_DEBUG("[LATENCY] Packet #1 ignored due to warm-up overhead.");
return;
}
if (channel == 2){
if (!timestamp_nic_queue.empty()) {
auto timestamp_nic = timestamp_nic_queue.front();
timestamp_nic_queue.pop_front(); // Hapus dari queue setelah dipakai
double host_latency = std::chrono::duration<double, std::milli>(timestamp_host - timestamp_nic).count();
double full_latency = std::chrono::duration<double, std::milli>(timestamp_gpu - timestamp_nic).count();
double nicGpu_latency = full_latency - gpu_latency;
total_host_latency += host_latency;
total_gpu_latency += gpu_latency;
total_full_latency += full_latency;
total_nicGpu_latency += nicGpu_latency;
data_size = size;
saveLatency("RDMAWrite_latency_measure.csv", (channel == 0 ? "ZMQ" : channel == 1 ? "IOUring" : "RDMAWrite"), data_size, host_latency, gpu_latency, full_latency, nicGpu_latency);
} else {
latency_count--;
LOG_WARNING("Timestamp NIC queue is empty, latency calculation skipped!");
}
}else{
double host_latency = std::chrono::duration<double, std::milli>(timestamp_host - timestamp_nic).count();
double full_latency = std::chrono::duration<double, std::milli>(timestamp_gpu - timestamp_nic).count();
double nicGpu_latency = full_latency - gpu_latency;
total_host_latency += host_latency;
total_gpu_latency += gpu_latency;
total_full_latency += full_latency;
total_nicGpu_latency += nicGpu_latency;
data_size = size;
//LOG_INFO("Host Latency: %.3f ms | GPU Latency: %.3f ms | FULL Latency: %.3f ms | NIC-GPU Latency: %.3f ms", host_latency, gpu_latency, full_latency, nicGpu_latency);
saveLatency("iouring_latency_measure.csv", (channel == 0 ? "ZMQ" : channel == 1 ? "IOUring" : "RDMASend"), data_size, host_latency, gpu_latency, full_latency, nicGpu_latency);
}
});
std::thread receiving_thread = std::thread([&channel_ptr, accelerator, buffer_size, session] {
void* buffer = nullptr;
if (accelerator == 1) {
buffer = kayon::cuda::CUDABufferAllocator::allocatePinnedMemory(buffer_size);
} else {
buffer = kayon::cpu::CPUBufferAllocator::allocatePinnedMemory(buffer_size);
}
channel_ptr->open(buffer, buffer_size);
std::this_thread::sleep_for(std::chrono::seconds(session));
channel_ptr->close();
if (accelerator == 1) {
kayon::cuda::CUDABufferAllocator::freePinnedMemory(buffer);
} else {
kayon::cpu::CPUBufferAllocator::freePinnedMemory(buffer, buffer_size);
}
});
receiving_thread.join();
double avg_host_latency = total_host_latency / (latency_count - 1);
double avg_gpu_latency = total_gpu_latency / (latency_count - 1);
double avg_full_latency = total_full_latency / (latency_count - 1);
double avg_nicGpu_latency = total_nicGpu_latency / (latency_count - 1);
LOG_INFO("Avg Host Latency: %.3f ms | Avg GPU Latency: %.3f ms | Avg FULL Latency: %.3f ms | Avg NIC-GPU Latency: %.3f ms | Total Packets: %d", avg_host_latency, avg_gpu_latency, avg_full_latency, avg_nicGpu_latency, (latency_count - 1));
saveAverageLatency("AVGlatency_measure.csv", (channel == 0 ? "ZMQ" : channel == 1 ? "IOUring" : "RDMAWrite"), data_size, avg_host_latency, avg_gpu_latency, avg_full_latency, avg_nicGpu_latency);
if (accelerator == 1) {
kayon::cuda::destroyGpuEvents();
}
LOG_DEBUG("Latency Benchmark Completed.");
}
void runCpuUsageBenchmark(int channel, size_t buffer_size, int accelerator, int batch, int session, const std::string& param_name, const std::string& address) {
LOG_DEBUG("Running CPU Usage Benchmark...");
size_t data_size = 0;
auto schema = createSchema();
size_t batch_size = schema->getSchemaSize() * batch;
kayon::core::TupleBuffer tupleBuffer(batch_size, schema);
kayon::core::ChannelDescriptor channel_descriptor;
channel_descriptor.set_type(static_cast<kayon::core::ChannelType>(channel));
kayon::core::ChannelDescriptorUtil::addParameter(channel_descriptor, "receiver", 1);
kayon::core::ChannelDescriptorUtil::addParameter(channel_descriptor, param_name, address);
std::shared_ptr<kayon::core::Channel> channel_ptr;
switch (channel) {
case 0: // ZMQ
channel_ptr = std::make_shared<kayon::zmq::ZMQChannel>(channel_descriptor);
break;
case 1: // IOUring
channel_ptr = std::make_shared<kayon::io_uring::IOUringChannel>(channel_descriptor);
break;
case 2: // RDMA
kayon::core::ChannelDescriptorUtil::addParameter(channel_descriptor, "rdma_device", std::string("mlx5_0"));
channel_ptr = std::make_shared<kayon::rdma::RDMAChannel>(channel_descriptor);
break;
default:
LOG_ERROR_AND_THROW("Invalid channel value for throughput benchmark: %d", channel);
}
// get the context switch before benchmark
struct rusage usage_start;
getrusage(RUSAGE_SELF, &usage_start);
CpuTimes start_time = getCpuUsage();
double start_cpu_time = getCPUTime();
channel_ptr->setCallback([&tupleBuffer, &schema, &data_size](void* buffer, size_t size) {
size_t tupleSize = schema->getSchemaSize();
if (size % tupleSize != 0) {
LOG_ERROR("Received data size is not a multiple of tuple size! Expected multiple of %zu, got %zu", tupleSize, size);
return;
}
size_t numTuples = size / tupleSize;
total_received_tuple += numTuples;
kayon::cuda::launchGpuProcessB(static_cast<char*>(buffer), size);
data_size = size;
});
std::thread receiving_thread = std::thread([&channel_ptr, accelerator, buffer_size, session] {
void* buffer = nullptr;
if (accelerator == 1) {
buffer = kayon::cuda::CUDABufferAllocator::allocatePinnedMemory(buffer_size);
} else {
buffer = kayon::cpu::CPUBufferAllocator::allocatePinnedMemory(buffer_size);
}
channel_ptr->open(buffer, buffer_size);
std::this_thread::sleep_for(std::chrono::seconds(session));
channel_ptr->close();
if (accelerator == 1) {
kayon::cuda::CUDABufferAllocator::freePinnedMemory(buffer);
} else {
kayon::cpu::CPUBufferAllocator::freePinnedMemory(buffer, buffer_size);
}
});
receiving_thread.join();
// get context switch after benchmark
struct rusage usage_end;
getrusage(RUSAGE_SELF, &usage_end);
long voluntary_cs = usage_end.ru_nvcsw - usage_start.ru_nvcsw;
long nonvoluntary_cs = usage_end.ru_nivcsw - usage_start.ru_nivcsw;
LOG_INFO("[CONTEXT SWITCH] Voluntary: %ld | Nonvoluntary: %ld", voluntary_cs, nonvoluntary_cs);
// Simpan ke file CSV
std::ofstream csFile("16rdmaSendContext_switch.csv");
csFile << "Voluntary,Nonvoluntary\n";
csFile << voluntary_cs << "," << nonvoluntary_cs << "\n";
csFile.close();
LOG_DEBUG("Finish CPU Usage Benchmark...");
}
void setBenchmark(const std::string& filename) {
try {
YAML::Node config = YAML::LoadFile(filename);
int channel = config["receiver"]["channel"].as<int>();
std::string channelName;
switch (channel) {
case 0: channelName = "zmq"; break;
case 1: channelName = "iouring"; break;
case 2: channelName = "rdma"; break;
default:
LOG_ERROR_AND_THROW("Invalid channel value: %d", channel);
}
size_t buffer_size = config["receiver"]["buffer_size"].as<size_t>();
int accelerator = config["receiver"]["accelerator"].as<int>();
int batch = config["receiver"]["batch"].as<int>();
int session = config["receiver"]["session"].as<int>();
int duration = config["receiver"]["duration"].as<int>();
std::string param_name = config["receiver"]["name_address"][channelName].as<std::string>();
std::string address = config["receiver"]["address"][channelName].as<std::string>();
bool throughput = config["benchmark"]["throughput"].as<bool>();
bool latency = config["benchmark"]["latency"].as<bool>();
bool cpu_usage = config["benchmark"]["cpu_usage"].as<bool>();
if (throughput) runThroughputBenchmark(channel, buffer_size, accelerator, batch, session, duration, param_name, address);
if (latency) runLatencyBenchmark(channel, buffer_size, accelerator, batch, session, duration, param_name, address);
if (cpu_usage){
// run CPU monitoring
std::thread cpuMonitor(monitorCPUUsage, 5, session, 5);
// wait for 5s before server starts
std::this_thread::sleep_for(std::chrono::seconds(5));
runCpuUsageBenchmark(channel, buffer_size, accelerator, batch, session, duration, param_name, address);
std::this_thread::sleep_for(std::chrono::seconds(5));
// Stop CPU monitoring
stopMonitoring.store(true);
cpuMonitor.join();
}
} catch (const std::exception& e) {
LOG_ERROR_AND_THROW("Error loading YAML file: %s", e.what());
}
}
int main() {
std::string yaml_file = "../kayon-benchmark/benchmark_transport.yaml";
setBenchmark(yaml_file);
return 0;
}
receiver:
channel: 2 # 0 = ZMQ, 1 = IOUring, 2 = RDMA
buffer_size: 294912
accelerator: 1 # 0 = CPU, 1 = CUDA
batch: 512
session: 4
duration: 1
name_address:
zmq: "zmq_address"
iouring: "iouring_address"
rdma: "rdma_address"
address:
zmq: "tcp://0.0.0.0:8080"
iouring: "tcp://0.0.0.0:8080"
rdma: "nic://0.0.0.0:8080"
benchmark:
throughput: true
latency: false
cpu_usage: false
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment