Skip to content

Instantly share code, notes, and snippets.

@owent
Last active July 27, 2023 02:25
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save owent/72c3fd5f4bb63a863641 to your computer and use it in GitHub Desktop.
Save owent/72c3fd5f4bb63a863641 to your computer and use it in GitHub Desktop.
zeromq 集群模型压力测试

README

proxyd进程

proxyd有一个接受endpoint,并会监听一个内部地址和一个外部地址

内部地址用于连接本机的serverd,外部地址用于和其他集群的proxyd通信

proxyd对内部serverd发送消息采用zmq router节点,对每个外部proxyd都会建立一个zmq dealer节点与之通信

proxyd接收数据统一通过内部zmq socket, 对内不服务器发送消息也通过这个zmq sokcet,对每个外部proxyd发送消息会有一个专用的zmq socket

proxyd收到内部节点之间的数据通信消息后会通知这两个节点建立起通信通道,之后这两个兄弟节点会直接通信,不再通过proxyd中转

serverd进程

serverd进程会监听一个内部地址,用于接收消息

serverd发送消息时,如果对方处于同一个proxyd下,第一次会通过proxyd中转,后续会收到proxyd的兄弟节点互联消息,而后直接通信

serverd发送消息到另一个proxyed下的serverd时,会通过proxyd中转

serverd的压力测试逻辑为循环发送消息,最后一个消息时检测时间间隔,计算性能数据

client进程

client进程的功能仅仅用于通知serverd开始压力测试逻辑,不接受回包。直接退出

测试报告

测试环境

名称
CPU Intel(R) Xeon(R) CPU X3440 @ 2.53GHz * 8
内存 8096GB
网卡 千兆网卡
磁盘 压测程序未使用
系统 CentOS 6.2
GCC GCC 4.9.1
编译选项 -std=c++11 -Ilibzmq/include -Llibzmq/lib -lzmq -lstdc++ -g -ggdb -O2 -pthread -lrt

测试结果

测试报告中左边为本机通信(即同一proxyd下两个serverd之间通信),绑定协议为ipc

测试报告中右边为跨机器通信(即不同机器下的两个proxyd下两个serverd之间通信),内部通信绑定协议为ipc,外部通信绑定协议为tcp

测试报告中第一组测试逻辑为两个serverd互相中转消息N次(会等待收到消息后再转回去)

测试报告中第二组测试逻辑为一个serverd循环想另一个serverd发送消息N次(没有等待操作)

zeromq 性能测试报告

#include "zmq.hpp"
#include <assert.h>
#include <cstdio>
#include <cstring>
#include <ctime>
#include <map>
#include <set>
#include <list>
#include <string>
#include <memory>
// 通信模型
// {[CLIENT]}->[DEALER]<->[ROUTER|ROUTER]<->[ROUTER|ROUTER]<->[DEALER]
//
enum msg_type {
EN_REPORT = 1,
EN_TRANS = 2,
};
struct msg_send {
char src[16];
char dst[16];
};
struct app_reg {
char app_name[16];
char proxy_name[16];
char listen[64];
};
struct msg_head {
msg_type type;
union {
msg_send send;
app_reg reg;
};
};
struct msg_body {
int left_times;
int sum_times;
time_t start;
time_t end;
};
int main(int argc, char *argv[]) {
if (argc < 5) {
printf("usage: %s <src endpoint> <dst name> <times> <msg>\n", argv[0]);
return 0;
}
zmq::context_t ctx(1);
// app接收端点
zmq::socket_t app_endpoint(ctx, ZMQ_DEALER);
app_endpoint.connect(argv[1]);
size_t len = strlen(argv[4]);
zmq::message_t msg(sizeof(msg_head) +sizeof(msg_body) +len);
msg_head* head = (msg_head*) msg.data();
msg_body* body = (msg_body*) ((char*) msg.data() + sizeof(msg_head));
head->type = EN_TRANS;
strncpy(head->send.src, argv[2], sizeof(head->send.src));
strncpy(head->send.dst, "", sizeof(head->send.dst));
body->start = 0;
body->end = 0;
body->left_times = body->sum_times = atoi(argv[3]);
memcpy((char*) msg.data() + sizeof(msg_head) +sizeof(msg_body), argv[4], len);
bool b = app_endpoint.send(msg, 0);
if (!b)
fprintf(stderr, "send failed.\n");
return 0;
}
#include "zmq.hpp"
#include <assert.h>
#include <cstdio>
#include <cstring>
#include <map>
#include <set>
#include <vector>
#include <list>
#include <string>
#include <memory>
// 通信模型
// [CLIENT]->[DEALER]<->{[ROUTER|ROUTER]<->[ROUTER|ROUTER]}<->[DEALER]
//
enum msg_type {
EN_REPORT = 1,
EN_TRANS = 2,
};
struct msg_send {
char src[16];
char dst[16];
};
struct app_reg {
char app_name[16];
char proxy_name[16];
char listen[64];
};
struct msg_head {
msg_type type;
union {
msg_send send;
app_reg reg;
};
};
struct msg_body {
int left_times;
int sum_times;
time_t start;
time_t end;
};
void dump_data(void* data, size_t s) {
while (s > 0) {
msg_head* h = (msg_head*) data;
if (s >= sizeof(msg_head) && EN_REPORT == h->type) {
printf("[DEBUG] recv report, app name %s, proxy name %s, listen %s\n", h->reg.app_name, h->reg.proxy_name, h->reg.listen);
s -= sizeof(msg_head);
data = (char*) data + sizeof(msg_head);
continue;
}
if (s >= sizeof(msg_head) && EN_TRANS == h->type) {
printf("[DEBUG] recv transform, src %s, dst %s\n", h->send.src, h->send.dst);
msg_body* body = (msg_body*) ((char*) data + sizeof(msg_head));
printf("[-----] start time:%llu, end time: %llu, sum: %d, left %d\n", body->start, body->end, body->sum_times, body->left_times);
char* str = (char*) malloc(s);
memset(str, 0, s);
memcpy(str, (char*) data + sizeof(msg_head) +sizeof(msg_body), s - sizeof(msg_head) -sizeof(msg_body));
printf("%s\n", str);
free(str);
s = 0;
continue;
}
printf("dump error msg (len => %d).\n", (int) s);
char* p = (char*) data;
for (size_t i = 0; i < s; ++i) {
if (p[i] <= 126 && p[i] >= 32)
putchar(p[i]);
else
printf("%p", p[i]);
}
putchar('\n');
s = 0;
}
}
char my_name[16] = {0};
std::map<std::string, app_reg> g_app_list;
std::map<std::string, std::shared_ptr<zmq::socket_t> > g_proxy_list;
std::list<std::shared_ptr<zmq::socket_t> > g_proxy_socks;
bool reg_proxy(app_reg& proxy, std::shared_ptr<zmq::socket_t> sock) {
std::string app_name = proxy.app_name;
if (g_proxy_list.find(app_name) != g_proxy_list.end()) {
return false;
}
g_proxy_list[app_name] = sock;
printf("reg proxy %s success.\n", app_name.c_str());
return true;
}
void connect_to_proxy(zmq::context_t& ctx, const char* listen, msg_head& proxy_reg) {
std::shared_ptr<zmq::socket_t> send_endpoint = std::shared_ptr<zmq::socket_t>(new zmq::socket_t(ctx, ZMQ_DEALER));
send_endpoint->connect(listen);
send_endpoint->send(&proxy_reg, sizeof(proxy_reg), 0);
zmq::pollitem_t poll_items[1];
poll_items[0].events = ZMQ_POLLIN;
poll_items[0].fd = 0;
poll_items[0].revents = 0;
poll_items[0].socket = *send_endpoint;
int rc = zmq::poll(poll_items, 1, 1000);
if (rc <= 0) {
fprintf(stderr, "poll ret: %d\n", rc);
return;
}
zmq::message_t msg;
send_endpoint->recv(&msg, 0);
msg_head* head = (msg_head*) msg.data();
dump_data(msg.data(), msg.size());
if (EN_REPORT != head->type)
return;
if (false == reg_proxy(head->reg, send_endpoint))
return;
g_proxy_socks.push_back(send_endpoint);
}
bool reg_app(app_reg& app) {
std::string app_name = app.app_name;
if (g_app_list.find(app_name) != g_app_list.end()) {
return false;
}
memcpy(&g_app_list[app_name], &app, sizeof(app));
printf("reg app %s(proxy: %s) success.\n", app.app_name, app.proxy_name);
return true;
}
bool reg_bro_app(zmq::socket_t& router_sock, app_reg& src, app_reg& dst) {
msg_head app_reg;
app_reg.type = EN_REPORT;
strncpy(app_reg.reg.app_name, src.app_name, sizeof(app_reg.reg.app_name));
strncpy(app_reg.reg.proxy_name, src.proxy_name, sizeof(app_reg.reg.proxy_name));
strncpy(app_reg.reg.listen, src.listen, sizeof(app_reg.reg.listen));
router_sock.send(dst.app_name, strlen(dst.app_name), ZMQ_NOBLOCK | ZMQ_SNDMORE);
router_sock.send(&app_reg, sizeof(app_reg), ZMQ_NOBLOCK);
return true;
}
int main(int argc, char *argv[]) {
if (argc < 4) {
printf("usage: %s <name> <recv endpoint> <local endpoint> [connect proxy] ...\n", argv[0]);
return 0;
}
strncpy(my_name, argv[1], sizeof(my_name));
size_t name_len = strlen(my_name);
// 当前节点信息
msg_head proxy_reg;
proxy_reg.type = EN_REPORT;
strncpy(proxy_reg.reg.app_name, my_name, sizeof(proxy_reg.reg.app_name));
strncpy(proxy_reg.reg.proxy_name, my_name, sizeof(proxy_reg.reg.proxy_name));
strncpy(proxy_reg.reg.listen, argv[2], sizeof(proxy_reg.reg.listen));
zmq::context_t ctx(1);
// 接收端点
std::shared_ptr<zmq::socket_t> recv_endpoint = std::shared_ptr<zmq::socket_t>(new zmq::socket_t(ctx, ZMQ_ROUTER));
recv_endpoint->setsockopt(ZMQ_IDENTITY, my_name, name_len);
recv_endpoint->bind(argv[2]);
recv_endpoint->bind(argv[3]);
reg_proxy(proxy_reg.reg, recv_endpoint);
for (int i = 4; i < argc; ++i) {
// 临时发送端点
connect_to_proxy(ctx, argv[i], proxy_reg);
}
while (true) {
zmq::pollitem_t poll_items[1];
poll_items[0].events = ZMQ_POLLIN;
poll_items[0].fd = 0;
poll_items[0].revents = 0;
poll_items[0].socket = *recv_endpoint;
int rc = zmq::poll(poll_items, 1);
if (rc < 0) {
fprintf(stderr, "poll ret: %d\n", rc);
continue;
}
if (poll_items[0].revents & ZMQ_POLLIN) {
zmq::message_t msg, msg_route_node;
// 第一次会收到路由节点名称
while (recv_endpoint->recv(&msg_route_node, ZMQ_NOBLOCK)) {
bool flag = recv_endpoint->recv(&msg, ZMQ_NOBLOCK);
if (false == flag) {
fprintf(stderr, "poll success but recv failed\n");
continue;
}
// dump_data(msg.data(), msg.size());
size_t left_len = msg.size();
while (left_len >= sizeof(msg_head)) {
msg_head* head = (msg_head*) msg.data();
// proxy 注册协议
if (EN_REPORT == head->type) {
left_len -= sizeof(msg_head);
app_reg& app = head->reg;
// 如果是proxy则返回自身的信息以供注册
if (0 == strcmp(app.app_name, app.proxy_name)) {
recv_endpoint->send(msg_route_node.data(), msg_route_node.size(), ZMQ_NOBLOCK | ZMQ_SNDMORE);
recv_endpoint->send(&proxy_reg, sizeof(proxy_reg), ZMQ_NOBLOCK);
// 反向连接
if (g_proxy_list.find(app.app_name) == g_proxy_list.end()) {
connect_to_proxy(ctx, app.listen, proxy_reg);
if (g_proxy_list.find(app.app_name) == g_proxy_list.end()) {
continue;
}
// 反向注册所有的app到远程proxy
for (std::map<std::string, app_reg>::value_type& app_node : g_app_list) {
if (0 != strcmp(app_node.second.proxy_name, my_name))
continue;
msg_head app_reg;
app_reg.type = EN_REPORT;
strncpy(app_reg.reg.app_name, app_node.second.app_name, sizeof(app_reg.reg.app_name));
strncpy(app_reg.reg.proxy_name, my_name, sizeof(app_reg.reg.proxy_name));
strncpy(app_reg.reg.listen, argv[2], sizeof(app_reg.reg.listen));
g_proxy_list[app.app_name]->send(&app_reg, sizeof(app_reg), ZMQ_NOBLOCK);
}
}
}
else if (0 == app.proxy_name[0]) { // 子节点同步广播到所有proxy
msg_head app_reg;
app_reg.type = EN_REPORT;
strncpy(app_reg.reg.app_name, app.app_name, sizeof(app_reg.reg.app_name));
strncpy(app_reg.reg.proxy_name, my_name, sizeof(app_reg.reg.proxy_name));
strncpy(app_reg.reg.listen, argv[2], sizeof(app_reg.reg.listen));
for (const std::map<std::string, std::shared_ptr<zmq::socket_t> >::value_type& proxy : g_proxy_list) {
if (proxy.second == recv_endpoint)
continue;
proxy.second->send(&app_reg, sizeof(app_reg), ZMQ_NOBLOCK);
}
strncpy(app_reg.reg.listen, app.listen, sizeof(app_reg.reg.listen));
reg_app(app_reg.reg);
}
else {
reg_app(app);
}
}
else if (EN_TRANS == head->type) {
left_len = 0;
msg_send& send = head->send;
std::map<std::string, app_reg>::iterator iter = g_app_list.find(send.dst);
if (iter == g_app_list.end()) {
fprintf(stderr, "send msg from src(%s) to dst(%s) failed. dst not found. cur node(%s).\n", send.src, send.dst, my_name);
continue;
}
// 发送到下属子节点
if (0 == strcmp(iter->second.proxy_name, my_name)) {
recv_endpoint->send(iter->second.app_name, strlen(iter->second.app_name), ZMQ_SNDMORE);
bool b = recv_endpoint->send(msg, 0);
if (false == b) {
fprintf(stderr, "send msg from src(%s) to dst(%s) failed. send failed. cur node(%s).\n", send.src, send.dst, my_name);
}
}
else { // 发送到远程代理节点
std::map<std::string, std::shared_ptr<zmq::socket_t> >::iterator proxy_it = g_proxy_list.find(iter->second.proxy_name);
if (proxy_it == g_proxy_list.end()) {
fprintf(stderr, "proxy %s not registered.\n", iter->second.proxy_name);
continue;
}
bool b = proxy_it->second->send(msg, 0);
if (false == b) {
fprintf(stderr, "send msg from src(%s) to dst(%s) failed. send failed. cur node(%s).\n", send.src, send.dst, my_name);
}
}
// 下发兄弟节点信息
std::map<std::string, app_reg>::iterator iter_opt = g_app_list.find(send.src);
if (iter_opt == g_app_list.end()) {
fprintf(stderr, "send msg from src(%s) to dst(%s) warning. src not found. cur node(%s).\n", send.src, send.dst, my_name);
continue;
}
if (0 == strcmp(iter_opt->second.proxy_name, my_name) && 0 == strcmp(iter->second.proxy_name, my_name)) {
reg_bro_app(*recv_endpoint, iter->second, iter_opt->second);
reg_bro_app(*recv_endpoint, iter_opt->second, iter->second);
}
}
else {
fprintf(stderr, "got unknown action type %d.\n", head->type);
left_len = 0;
}
}
if (left_len != 0) {
dump_data(msg.data(), msg.size());
}
}
}
}
return 0;
}
#include "zmq.hpp"
#include <assert.h>
#include <cstdio>
#include <cstring>
#include <ctime>
#include <map>
#include <set>
#include <list>
#include <string>
#include <memory>
// 通信模型
// [CLIENT]->{[DEALER]}<->[ROUTER|ROUTER]<->[ROUTER|ROUTER]<->{[DEALER]}
//
enum msg_type {
EN_REPORT = 1,
EN_TRANS = 2,
};
struct msg_send {
char src[16];
char dst[16];
};
struct app_reg {
char app_name[16];
char proxy_name[16];
char listen[64];
};
struct msg_head {
msg_type type;
union {
msg_send send;
app_reg reg;
};
};
struct msg_body {
int left_times;
int sum_times;
time_t start;
time_t end;
};
void dump_data(void* data, size_t s) {
while (s > 0) {
msg_head* h = (msg_head*) data;
if (s >= sizeof(msg_head) && EN_REPORT == h->type) {
printf("[DEBUG] recv report, app name %s, proxy name %s, listen %s\n", h->reg.app_name, h->reg.proxy_name, h->reg.listen);
s -= sizeof(msg_head);
data = (char*) data + sizeof(msg_head);
continue;
}
if (s >= sizeof(msg_head) && EN_TRANS == h->type) {
printf("[DEBUG] recv transform, src %s, dst %s\n", h->send.src, h->send.dst);
msg_body* body = (msg_body*) ((char*) data + sizeof(msg_head));
printf("[-----] start time:%llu, end time: %llu, sum: %d, left %d\n", body->start, body->end, body->sum_times, body->left_times);
char* str = (char*) malloc(s);
memset(str, 0, s);
memcpy(str, (char*) data + sizeof(msg_head) +sizeof(msg_body), s - sizeof(msg_head) -sizeof(msg_body));
printf("%s\n", str);
free(str);
s = 0;
continue;
}
printf("dump error msg (len => %d).\n", (int) s);
char* p = (char*) data;
for (size_t i = 0; i < s; ++i) {
if (p[i] <= 126 && p[i] >= 32)
putchar(p[i]);
else
printf("%p", p[i]);
}
putchar('\n');
s = 0;
}
}
char my_name[16] = { 0 };
struct app_data {
app_reg app;
std::shared_ptr<zmq::socket_t> sock;
};
std::map<std::string, app_data> g_app_list;
bool reg_app(zmq::context_t& ctx, app_reg& app) {
std::string app_name = app.app_name;
if (g_app_list.find(app_name) != g_app_list.end()) {
return false;
}
memcpy(&g_app_list[app_name].app, &app, sizeof(app));
std::shared_ptr<zmq::socket_t> sock = g_app_list[app_name].sock = std::shared_ptr<zmq::socket_t>(new zmq::socket_t(ctx, ZMQ_DEALER));
sock->connect(app.listen);
printf("reg bro app %s success.\n", app.app_name);
return true;
}
bool send_msg(zmq::socket_t& proxy_sock, zmq::message_t& msg) {
msg_head* head = (msg_head*) msg.data();
std::string dst = head->send.dst;
std::map<std::string, app_data>::iterator iter = g_app_list.find(dst);
if (iter == g_app_list.end())
proxy_sock.send(msg, 0);
else
iter->second.sock->send(msg, 0);
return true;
}
int main(int argc, char *argv[]) {
if (argc < 4) {
printf("usage: %s <name> <recv endpoint> <proxy>\n", argv[0]);
return 0;
}
strncpy(my_name, argv[1], sizeof(my_name));
size_t name_len = strlen(my_name);
zmq::context_t ctx(1);
// proxy接收端点
zmq::socket_t proxy_endpoint(ctx, ZMQ_DEALER);
proxy_endpoint.setsockopt(ZMQ_IDENTITY, my_name, name_len);
proxy_endpoint.connect(argv[3]);
// app接收端点
zmq::socket_t app_endpoint(ctx, ZMQ_DEALER);
app_endpoint.bind(argv[2]);
// 发送注册消息
{
// 当前节点信息
msg_head app_reg;
app_reg.type = EN_REPORT;
strncpy(app_reg.reg.app_name, my_name, sizeof(app_reg.reg.app_name));
memset(app_reg.reg.proxy_name, 0, sizeof(app_reg.reg.proxy_name));
strncpy(app_reg.reg.listen, argv[2], sizeof(app_reg.reg.listen));
proxy_endpoint.send(&app_reg, sizeof(app_reg), ZMQ_NOBLOCK);
}
while (true) {
zmq::pollitem_t poll_items[2];
zmq::socket_t* pool_sock [2];
poll_items[0].events = ZMQ_POLLIN;
poll_items[0].fd = 0;
poll_items[0].revents = 0;
poll_items[0].socket = *(pool_sock[0] = &proxy_endpoint);
poll_items[1].events = ZMQ_POLLIN;
poll_items[1].fd = 0;
poll_items[1].revents = 0;
poll_items[1].socket = *(pool_sock[1] = &app_endpoint);
int rc = zmq::poll(poll_items, 2);
if (rc < 0) {
fprintf(stderr, "poll ret: %d\n", rc);
continue;
}
for (int i = 0; i < 2; ++i) {
if (!(poll_items[i].revents & ZMQ_POLLIN))
continue;
zmq::message_t msg;
while (pool_sock[i]->recv(&msg, ZMQ_NOBLOCK)) {
//dump_data(msg.data(), msg.size());
size_t left_len = msg.size();
while (left_len >= sizeof(msg_head)) {
msg_head* head = (msg_head*) msg.data();
// app 注册协议
if (EN_REPORT == head->type) {
left_len -= sizeof(msg_head);
app_reg& app = head->reg;
reg_app(ctx, app);
}
else if (EN_TRANS == head->type) {
left_len = 0;
msg_send& send = head->send;
msg_body* body = (msg_body*) ((char*) msg.data() + sizeof(msg_head));
bool is_start = (0 == body->start);
if (is_start) {
printf("got msg src %s,dst %s, times: %d, msg len: %d\n", send.src, send.dst, body->sum_times, (int) msg.size());
body->start = time(NULL);
body->left_times = body->sum_times;
}
if (body->left_times <= 0) {
int cost_time = (int) (body->end - body->start);
cost_time = cost_time <= 0 ? 1 : cost_time;
printf("========================================================================================================================\n");
printf("process msg (size: %d/%d) transform %d times done. cost %d seconds. avg:(%lg/s)\n",
(int)(msg.size() - sizeof(msg_head) -sizeof(msg_body)),
(int)msg.size(),
body->sum_times,
cost_time,
1.0 * body->sum_times / cost_time
);
printf("========================================================================================================================\n");
continue;
}
if (is_start) {
// 交换发送方
{
memcpy(send.dst, send.src, sizeof(send.dst));
memcpy(send.src, my_name, sizeof(send.src));
}
while (body->left_times >= 0) {
if (0 == body->left_times)
body->end = time(NULL);
zmq::message_t snd_msg(msg.size());
memcpy(snd_msg.data(), msg.data(), msg.size());
send_msg(proxy_endpoint, snd_msg);
-- body->left_times;
}
}
}
else {
fprintf(stderr, "got unknown action type %d.\n", head->type);
left_len = 0;
}
}
if (left_len != 0) {
dump_data(msg.data(), msg.size());
}
}
}
}
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment