Skip to content

Instantly share code, notes, and snippets.

@sabottenda
Created November 10, 2013 15:33
Show Gist options
  • Save sabottenda/7399604 to your computer and use it in GitHub Desktop.
Save sabottenda/7399604 to your computer and use it in GitHub Desktop.
websocket bench with libwebsockets
#include <stdio.h>
#include <stdlib.h>
#include <signal.h>
#include <memory.h>
#include <sys/time.h>
#include <sys/stat.h>
#include <sys/shm.h>
#include <sys/sem.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/wait.h>
#include <libwebsockets.h>
#define PROTOCAL_NAME "default"
#define NPROCESS 4
#define NCONS 10
#define LIMIT 500
int semaphore_id;
int force_exit = 0;
int rpc_limit = 1;
int nconnections = NCONS;
int debug_mode = 0;
int unique_id = -1;
struct shared_data *shared;
struct per_serssion_data {
int session_id;
unsigned long long wcount;
unsigned long long rcount;
struct timeval start_time;
struct timeval end_time;
double total_response_time;
double max_response_time;
double min_response_time;
};
struct per_context_data {
int next_session_id;
int start;
unsigned long long limit_count;
unsigned long long wcount;
unsigned long long rcount;
struct timeval prev_time;
struct timeval start_time;
struct timeval end_time;
double overall_time;
double total_response_time;
double max_response_time;
double min_response_time;
};
struct per_process_data {
int pid;
int unique_id;
struct per_context_data pcd;
};
struct shared_data {
int next_unique_id;
struct per_process_data *ppd;
};
const char *callback_reasons[] = {
"LWS_CALLBACK_ESTABLISHED",
"LWS_CALLBACK_CLIENT_CONNECTION_ERROR",
"LWS_CALLBACK_CLIENT_FILTER_PRE_ESTABLISH",
"LWS_CALLBACK_CLIENT_ESTABLISHED",
"LWS_CALLBACK_CLOSED",
"LWS_CALLBACK_CLOSED_HTTP",
"LWS_CALLBACK_RECEIVE",
"LWS_CALLBACK_CLIENT_RECEIVE",
"LWS_CALLBACK_CLIENT_RECEIVE_PONG",
"LWS_CALLBACK_CLIENT_WRITEABLE",
"LWS_CALLBACK_SERVER_WRITEABLE",
"LWS_CALLBACK_HTTP",
"LWS_CALLBACK_HTTP_FILE_COMPLETION",
"LWS_CALLBACK_HTTP_WRITEABLE",
"LWS_CALLBACK_FILTER_NETWORK_CONNECTION",
"LWS_CALLBACK_FILTER_HTTP_CONNECTION",
"LWS_CALLBACK_FILTER_PROTOCOL_CONNECTION",
"LWS_CALLBACK_OPENSSL_LOAD_EXTRA_CLIENT_VERIFY_CERTS",
"LWS_CALLBACK_OPENSSL_LOAD_EXTRA_SERVER_VERIFY_CERTS",
"LWS_CALLBACK_OPENSSL_PERFORM_CLIENT_CERT_VERIFICATION",
"LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER",
"LWS_CALLBACK_CONFIRM_EXTENSION_OKAY",
"LWS_CALLBACK_CLIENT_CONFIRM_EXTENSION_SUPPORTED",
"LWS_CALLBACK_PROTOCOL_INIT",
"LWS_CALLBACK_PROTOCOL_DESTROY",
"LWS_CALLBACK_ADD_POLL_FD",
"LWS_CALLBACK_DEL_POLL_FD",
"LWS_CALLBACK_SET_MODE_POLL_FD",
"LWS_CALLBACK_CLEAR_MODE_POLL_FD"
};
static double calc_time_diff(struct timeval a, struct timeval b) {
unsigned long sec = a.tv_sec - b.tv_sec;
long usec = a.tv_usec - b.tv_usec;
return sec + usec / 1000000.0;
}
static int
callback_test(struct libwebsocket_context *context,
struct libwebsocket *wsi,
enum libwebsocket_callback_reasons reason,
void *user, void *in, size_t len) {
struct per_serssion_data *psd = (struct per_serssion_data *)user;
struct per_context_data *pcd = (struct per_context_data *)libwebsocket_context_user(context);
if (debug_mode) {
if (psd != NULL)
printf("[SID:%d] ", psd->session_id);
else
printf("[SID:nil] ");
printf("callback_test is called: reason:%s(%d)\n", callback_reasons[reason], reason);
}
char buf[100];
switch (reason) {
case LWS_CALLBACK_ESTABLISHED:
break;
case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
printf("connection error\n");
break;
case LWS_CALLBACK_CLIENT_FILTER_PRE_ESTABLISH:
break;
case LWS_CALLBACK_CLIENT_ESTABLISHED:
memset(psd, 0, sizeof(struct per_serssion_data));
psd->wcount = 0;
psd->rcount = 0;
psd->min_response_time = 1 << 20;
psd->session_id = pcd->next_session_id++;
if (debug_mode) printf("[SID:%d] established\n", psd->session_id);
break;
case LWS_CALLBACK_CLOSED:
if (debug_mode) printf("[SID:%d] closed\n", psd->session_id);
pcd->wcount += psd->wcount;
pcd->rcount += psd->rcount;
pcd->total_response_time += psd->total_response_time;
if (pcd->max_response_time < psd->max_response_time)
pcd->max_response_time = psd->max_response_time;
if (pcd->min_response_time > psd->min_response_time)
pcd->min_response_time = psd->min_response_time;
break;
case LWS_CALLBACK_CLOSED_HTTP:
break;
case LWS_CALLBACK_RECEIVE:
break;
case LWS_CALLBACK_CLIENT_RECEIVE: {
if (pcd->start) {
gettimeofday(&psd->end_time, NULL);
if (rpc_limit) {
double diff = calc_time_diff(psd->start_time, pcd->prev_time);
if (diff >= 1.0) {
pcd->prev_time = psd->start_time;
pcd->limit_count = 0;
}
pcd->limit_count++;
}
double time = calc_time_diff(psd->end_time, psd->start_time);
psd->total_response_time += time;
if (psd->max_response_time < time) psd->max_response_time = time;
if (psd->min_response_time > time) psd->min_response_time = time;
psd->rcount++;
((char *)in)[len] = '\0';
if (debug_mode) fprintf(stderr, "RECEIVE: len:%d in:%s count:%lld\n", (int)len, (char *)in, psd->rcount);
if (!rpc_limit || pcd->limit_count < LIMIT)
libwebsocket_callback_on_writable(context, wsi);
}
}
break;
case LWS_CALLBACK_CLIENT_RECEIVE_PONG:
break;
case LWS_CALLBACK_CLIENT_WRITEABLE: {
if (pcd->start) {
psd->wcount++;
if (debug_mode) printf("WRITEABLE: count:%lld\n", psd->wcount);
snprintf(buf, sizeof(buf), "hoge%lld", psd->wcount);
int msg_size = strlen(buf);
gettimeofday(&psd->start_time, NULL);
int n = libwebsocket_write(wsi, (unsigned char *)buf, msg_size, LWS_WRITE_TEXT);
if (n < 0)
return -1;
if (n < msg_size) {
lwsl_err("Partial write LWS_CALLBACK_CLIENT_WRITEABLE\n");
return -1;
}
}
}
break;
case LWS_CALLBACK_SERVER_WRITEABLE:
break;
case LWS_CALLBACK_HTTP:
break;
case LWS_CALLBACK_HTTP_FILE_COMPLETION:
break;
case LWS_CALLBACK_HTTP_WRITEABLE:
break;
case LWS_CALLBACK_FILTER_NETWORK_CONNECTION:
break;
case LWS_CALLBACK_FILTER_HTTP_CONNECTION:
break;
case LWS_CALLBACK_FILTER_PROTOCOL_CONNECTION:
break;
case LWS_CALLBACK_OPENSSL_LOAD_EXTRA_CLIENT_VERIFY_CERTS:
break;
case LWS_CALLBACK_OPENSSL_LOAD_EXTRA_SERVER_VERIFY_CERTS:
break;
case LWS_CALLBACK_OPENSSL_PERFORM_CLIENT_CERT_VERIFICATION:
break;
case LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER:
break;
case LWS_CALLBACK_CONFIRM_EXTENSION_OKAY:
break;
case LWS_CALLBACK_CLIENT_CONFIRM_EXTENSION_SUPPORTED:
break;
case LWS_CALLBACK_PROTOCOL_INIT:
break;
case LWS_CALLBACK_PROTOCOL_DESTROY:
break;
case LWS_CALLBACK_ADD_POLL_FD:
break;
case LWS_CALLBACK_DEL_POLL_FD:
break;
case LWS_CALLBACK_SET_MODE_POLL_FD:
break;
case LWS_CALLBACK_CLEAR_MODE_POLL_FD:
break;
default:
printf("callback_test: default is called %d\n", reason);
break;
}
return 0;
}
static struct libwebsocket_protocols protocols[] = {
{
PROTOCAL_NAME,
callback_test,
sizeof(struct per_serssion_data),
20,
},
{ NULL, NULL, 0, 0 } /* end */
};
void init_semaphore() {
semaphore_id = semget(IPC_PRIVATE, 1, 0666);
if (semaphore_id < 0) {
fprintf(stderr, "%s\n","ERR");
exit(1);
}
if (semctl(semaphore_id, 0, SETVAL, 1) < 0) {
fprintf(stderr, "%s\n","ERR");
exit(1);
}
}
void lock() {
struct sembuf sb;
sb.sem_num=0;
sb.sem_op=-1;
sb.sem_flg=0;
if (semop(semaphore_id, &sb, 1) < 0) {
fprintf(stderr, "lock fail\n");
exit(1);
}
}
void unlock() {
struct sembuf sb;
sb.sem_num = 0;
sb.sem_op = 1;
sb.sem_flg = 0;
if (semop(semaphore_id, &sb, 1) < 0) {
fprintf(stderr, "unlock fail\n");
exit(1);
}
}
void sighandler(int sig) {
if (debug_mode) printf("[UID:%d] sighandler:%d\n", unique_id, sig);
if (unique_id >= 0) {
force_exit = 1;
} else {
for (int i = 0; i< NPROCESS; i++) {
struct per_process_data *cur = &shared->ppd[i];
int pid = cur->pid;
kill(pid, SIGINT);
}
}
}
void worker_process(struct per_process_data *ppd) {
// set log level for lws
lws_set_log_level(7, lwsl_emit_syslog);
// allocate context data
struct per_context_data *pcd = malloc(sizeof(struct per_context_data));
memset(pcd, 0, sizeof(struct per_context_data));
pcd->min_response_time = 1 << 20;
// set context info
struct lws_context_creation_info info;
memset(&info, 0, sizeof(info));
info.port = CONTEXT_PORT_NO_LISTEN;
info.protocols = protocols;
#ifndef LWS_NO_EXTENSIONS
info.extensions = libwebsocket_get_internal_extensions();
#endif
info.gid = -1;
info.uid = -1;
info.user = pcd;
// create context
struct libwebsocket_context *context = libwebsocket_create_context(&info);
// create sessions
struct libwebsocket *wsi_dumb[NCONS];
const char *address = "localhost";
int port = 7681;
int use_ssl = 0;
int ietf_version = -1; // using latest version
for (int i = 0; i < NCONS; i++) {
wsi_dumb[i] = libwebsocket_client_connect(context, address, port, use_ssl,
"/", address, address,
PROTOCAL_NAME, ietf_version);
if (wsi_dumb == NULL) {
fprintf(stderr, "connection failed\n");
exit(1);
}
}
printf("[UID:%d] Connect to server\n", unique_id);
int ret = 0;
while (ret >= 0 && force_exit == 0) {
ret = libwebsocket_service(context, 100);
if (pcd->next_session_id == NCONS) break;
}
printf("[UID:%d] Start\n", unique_id);
struct timeval start_time, end_time;
gettimeofday(&start_time, NULL);
pcd->start = 1;
libwebsocket_callback_on_writable_all_protocol(&protocols[0]);
while (ret >= 0 && force_exit == 0) {
ret = libwebsocket_service(context, 100);
if (rpc_limit && pcd->limit_count >= LIMIT) {
struct timeval cur_time;
gettimeofday(&cur_time, NULL);
double diff = calc_time_diff(cur_time, pcd->prev_time);
if (diff >= 1.0) {
pcd->prev_time = cur_time;
pcd->limit_count = 0;
libwebsocket_callback_on_writable_all_protocol(&protocols[0]);
}
}
}
gettimeofday(&end_time, NULL);
printf("[UID:%d] Finish\n", unique_id);
libwebsocket_context_destroy(context);
context = NULL;
pcd->overall_time = calc_time_diff(end_time, start_time);
ppd->pcd = *pcd;
free(pcd);
pcd = NULL;
exit(0);
}
int main(void) {
const char *version = lws_get_library_version();
printf("libwebsockets version %s\n", version);
int shm_size = sizeof(struct shared_data) + sizeof(struct per_process_data) * NPROCESS;
int segment = shmget(IPC_PRIVATE, shm_size, S_IRUSR|S_IWUSR);
shared = (struct shared_data *)shmat(segment, NULL, 0);
memset(shared, 0, shm_size);
shared->ppd = (struct per_process_data *)(shared + sizeof(struct shared_data));
// create worker processes
unique_id = -1;
for (int i = 0; i < NPROCESS; i++) {
int pid = fork();
struct per_process_data *cur = &shared->ppd[i];
if (pid == 0) {
unique_id = i;
break;
} else {
cur->pid = pid;
cur->unique_id = i;
continue;
}
}
// set signal handler
signal(SIGINT, sighandler);
// start workers
if (unique_id >= 0)
worker_process(&shared->ppd[unique_id]);
// wait for workers
for (int i = 0; i < NPROCESS; i++) {
int status;
wait(&status);
if (debug_mode) printf("some process finished: status %d\n", status);
}
puts("");
printf("Results\n");
for (int i = 0; i < NPROCESS; i++) {
struct per_process_data *cur = &shared->ppd[i];
printf("Process %d's result\n", i);
printf(" Overall time: %f\n", cur->pcd.overall_time);
printf(" Requests Per Sec: %f\n", cur->pcd.wcount / cur->pcd.overall_time);
printf(" Read:%lld Write:%lld\n", cur->pcd.rcount, cur->pcd.wcount);
printf(" Total Response Time: %f\n", cur->pcd.total_response_time);
printf(" Response Time: average:%f max:%f: min:%f\n",
cur->pcd.total_response_time/cur->pcd.wcount,
cur->pcd.max_response_time, cur->pcd.min_response_time);
puts("");
}
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment