Skip to content

Instantly share code, notes, and snippets.

@danikin
Last active October 6, 2016 14:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save danikin/663ea6612f8e9ce9686c to your computer and use it in GitHub Desktop.
Save danikin/663ea6612f8e9ce9686c to your computer and use it in GitHub Desktop.
Load test for Tarantool
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <sys/time.h>
#include <tarantool/tarantool.h>
#include <tarantool/tnt_net.h>
#include <tarantool/tnt_opt.h>
#include <pthread.h>
#include <errno.h>
#include <string.h>
#include <unistd.h>
typedef struct test_params
{
char address[256];
int is_read_test;
int is_response_in_thread;
int num_ops;
int batch_size;
// Maximum number of requests per second that a client can stand
// Example:
int client_max_rps;
} test_params;
typedef struct thread_data
{
test_params *tp;
long long n_ops;
long long latency;
struct tnt_stream *tnt;
} thread_data;
int NUM_THREADS = 30;
void *read_thread(void *ctx)
{
struct tnt_stream *tnt = (struct tnt_stream*)ctx;
int num_responses;
/*
struct tnt_reply reply; tnt_reply_init(&reply);
while (1) {
tnt->read_reply(tnt, &reply); // Read reply from server
if (reply.code != 0)
printf("Read reply failed %s\n", reply.error);
tnt_reply_free(&reply); // Free reply
}*/
struct tnt_reply reply;
tnt_reply_init(&reply);
while (1)//tnt_next(&it))
{
// struct tnt_reply *r = TNT_IREPLY_PTR(&it);
// struct tnt_reply * reply = tnt_reply_init(NULL); // Initialize reply
tnt->read_reply(tnt, &reply); // Read reply from server
if (reply.code != 0)
printf("Read reply failed %s\n", reply.error);
tnt_reply_free(&reply); // Free reply
// if (r->code != 0)
// printf("Insert failed %s\n", r->error);
++num_responses;
if (!(num_responses%1000000))
{
printf("num_responses: %d\n", num_responses);
fflush(stdout);
}
}
/* if (num_responses != td->tp->batch_size)
{
fprintf(stderr, "Invalid number of responses: %d, batch size is %d\n", num_responses, td->tp->batch_size);
fflush(stdout);
}*/
}
void *do_test(void *ctx)
{
thread_data *td = (thread_data*)ctx;
struct tnt_stream *tnt = tnt_net(NULL); /* See note = SETUP */
td->tnt = tnt;
tnt_set(tnt, TNT_OPT_URI, td->tp->address);
if (tnt_connect(tnt) < 0)
{
printf("Connection refused\n");
exit(-1);
}
struct tnt_stream *tuple = tnt_object(NULL); /* See note = MAKE REQUEST */
int k = (int)time(NULL);
char str[1];
for (int i = 0; i < sizeof(str)-1; ++i)
str[i] = 'B';
str[sizeof(str)-1] = 0;
struct timeval tv, prev_tv;
if (td->tp->is_response_in_thread)
{
pthread_t read_pid;
int r = pthread_create(&read_pid, NULL, &read_thread, tnt);
if (r < 0)
{
fprintf(stderr, "multithread_test: could not create thread, r=%d, errno='%s'\n", r, strerror(errno));
fflush(stderr);
}
}
for (int i = 0; 1/*i < td->tp->num_ops / td->tp->batch_size*/; ++i)
{
//tnt_object_format(tuple, "[%d%s]", k + i, "B");
for (int j = 0; j < td->tp->batch_size; ++j)
{
// Doing some pretend work on a client
// usleep(1000000/td->tp->client_max_rps);
/* gettimeofday(&tv, NULL);
// Calculate and save latency
// Latency of each query in the batch is less than latency for the whole batch
if (i)
{
long long diff = (tv.tv_sec-prev_tv.tv_sec)*1000000 + (tv.tv_usec-prev_tv.tv_usec);
td->latency += td->tp->batch_size * diff;
}
prev_tv = tv;
*/
// Form a request to a server
if (td->tp->is_read_test)
{
tnt_object_add_array(tuple, 1);
tnt_object_add_int(tuple, k + i*td->tp->batch_size + j);
tnt_select(tnt, 512, 0, UINT32_MAX, 0, 0, tuple);
}
else
{
tnt_object_add_array(tuple, 2);
tnt_object_add_int(tuple, k + i*td->tp->batch_size + j);
tnt_object_add_int(tuple, k + i*td->tp->batch_size + j);
// tnt_object_add_strz(tuple, str);
tnt_replace(tnt, 512, tuple); /* See note = SEND REQUEST */
}
++td->n_ops;
tnt_flush(tnt);
tnt_object_reset(tuple);
}
//tnt_flush(tnt);
if (!td->tp->is_response_in_thread)
{
int num_responses = 0;
struct tnt_iter it; tnt_iter_reply(&it, tnt);
while (tnt_next(&it))
{
struct tnt_reply *r = TNT_IREPLY_PTR(&it);
if (r->code != 0)
printf("Insert failed %s\n", r->error);
++num_responses;
}
if (num_responses != td->tp->batch_size)
{
fprintf(stderr, "Invalid number of responses: %d, batch size is %d\n", num_responses, td->tp->batch_size);
fflush(stdout);
}
}
}
tnt_close(tnt); /* See below = TEARDOWN */
tnt_stream_free(tuple);
tnt_stream_free(tnt);
}
void *timer_thread(void *ctx)
{
thread_data *tds = (thread_data*)ctx;
long long prev_ops = 0, prev_latency = 0;
while (1)
{
sleep(1);
long long ops = 0, latency = 0;
for (int i = 0;i < NUM_THREADS;++i)
{
ops += tds[i].n_ops;
latency += tds[i].latency;
}
long long rps = ops - prev_ops;
long long latency_diff = latency - prev_latency;
printf("RPS: %d, average latency: %f\n", (int)rps, latency_diff*1e-6/rps);
fflush(stdout);
prev_ops = ops;
prev_latency = latency;
}
}
int main(int argc, char *argv[])
{
if (argc < 7)
{
printf("Usage: tar_test address:port read|write num_threads num_ops batch_size client_max_rps\n");
return 0;
}
test_params tp;
if (strlen(argv[1]) < 255)
strcpy(tp.address, argv[1]);
else
{
fprintf(stderr, "Address is too long\n");
return 1;
}
tp.is_read_test = !strcmp(argv[2], "read");
NUM_THREADS = atoi(argv[3]);
tp.num_ops = atoi(argv[4]);
tp.batch_size = atoi(argv[5]);
tp.client_max_rps = atoi(argv[6]);
tp.is_response_in_thread = 1;
pthread_t pids[NUM_THREADS];
thread_data tds[NUM_THREADS];
for (int i = 0;i < NUM_THREADS;++i)
{
tds[i].tp = &tp;
tds[i].n_ops = 0;
tds[i].latency = 0;
int r = pthread_create(pids + i, NULL, &do_test, tds + i);
if (r < 0)
{
fprintf(stderr, "multithread_test: could not create thread, i=%d, r=%d, errno='%s'\n", i, r, strerror(errno));
fflush(stderr);
}
}
pthread_t timer_pid;
int r = pthread_create(&timer_pid, NULL, &timer_thread, tds);
if (r < 0)
{
fprintf(stderr, "multithread_test: could not create thread, r=%d, errno='%s'\n", r, strerror(errno));
fflush(stderr);
}
for (int i = 0;i < NUM_THREADS;++i)
{
void *v;
int r = pthread_join(pids[i], &v);
if (r < 0)
{
fprintf(stderr, "multithread_test: could not join thread, i=%d, r=%d, errno='%s'\n", i, r, strerror(errno));
fflush(stderr);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment