Skip to content

Instantly share code, notes, and snippets.

@mingodad
Last active March 31, 2016 08:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mingodad/79225c88f8dce0f174f5 to your computer and use it in GitHub Desktop.
Save mingodad/79225c88f8dce0f174f5 to your computer and use it in GitHub Desktop.
Test program to check how sqlite perform under heavy concurrency
/* To build it with gcc
gcc -std=c99 -O2 -g -o sqlite-wal-test \
-DSQLITE_OMIT_LOAD_EXTENSION=1 \
-DSQLITE_USE_URI=1 \
main.c sqlite3.c -lpthread
*/
/* To run it with shell script
#!/bin/sh
data_size=256
num_threads=2
use_wal=1
use_synchronous=0
busy_retry=200
busy_retry_useconds=100000
start_loop_useconds=0
mid_loop_useconds=0
use_memory_db=0
./sqlite-wal-test ${data_size} ${num_threads} \
${use_wal} ${use_synchronous} \
${busy_retry} ${busy_retry_useconds} \
${start_loop_useconds} ${mid_loop_useconds} \
${use_memory_db}
*/
/* To run it with shell script removing the database first
#!/bin/sh
rm sessions.db
./run-it
*/
/* Output when running
Update rate: total=458547, last_total=423241, rate=35306, avg1=17653, avg2=38212
Update busy rate: total=124, last_total=113, rate=11, avg1=5, avg2=10
Update failed rate: total=0, last_total=0, rate=0, avg1=0, avg2=0
Read busy rate: total=0, last_total=0, rate=0, avg1=0, avg2=0
Read failed rate: total=0, last_total=0, rate=0, avg1=0, avg2=0
This is intended for test how fast sqlite3 can be on a system (cpu/memory/threads)
and with it we can from the command line change the following parameters:
data_size=256
num_threads=2
use_wal=1
use_synchronous=0
busy_retry=200
busy_retry_useconds=100000
start_loop_useconds=0
mid_loop_useconds=0
use_memory_db=0
And you'll get an output like this:
Update rate: total=458547, last_total=423241, rate=35306, avg1=17653, avg2=38212
Update busy rate: total=124, last_total=113, rate=11, avg1=5, avg2=10
Update failed rate: total=0, last_total=0, rate=0, avg1=0, avg2=0
Read busy rate: total=0, last_total=0, rate=0, avg1=0, avg2=0
Read failed rate: total=0, last_total=0, rate=0, avg1=0, avg2=0
Where :
Update rate = How many updates are done per second
Update busy rate = How many times sqlite returned SQLITE_BUSY (busy_retry_useconds)
Update failed rate = How many times we've failed our timeout (busy_retry * busy_retry_useconds)
Read busy rate = Same as "Update busy rate" but for reads
Read failed rate = Same as "Update failed rate" but for reads
- total = cumulative total from the beginning of the program execution
- last_total = the previous total
- rate = number of operations in the last second
- avg1 = average of operations by thread on the last second
- avg2 = average of operations by second since the begining of the program execution
*/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>
#include <unistd.h>
#include <errno.h>
#include <pthread.h>
#include "sqlite3.h"
#define handle_error_en(en, msg) \
do { errno = en; perror(msg); exit(EXIT_FAILURE); } while (0)
#define MIN_DATA_SIZE 256
static int data_size;
static int use_memory_db = 0;
static int use_wal = 0;
static int use_synchronous = 0;
static int threads_count;
static int * volatile update_count;
static int * volatile update_busy_count;
static int * volatile update_busy_failed;
static int * volatile read_busy_count;
static int * volatile read_failed;
static int busy_retry_count = 10;
static int busy_retry_useconds = 250000;
static int start_loop_useconds = 1000;
static int mid_loop_useconds = 1000;
static const char session_id[] = "ABC1234567890123456789";
static const char ip_address[] = "127.0.0.1";
static const char data[] = "a kind of initial data";
static char *session_data = NULL;
static volatile int keepRunning = 1;
void intHandler(int dummy) {
keepRunning = 0;
}
static const char create_sql[] = "create table if not exists sessions(id text primary key, data text, ip_address, cdate datetime default CURRENT_TIMESTAMP, mdate datetime);";
static const char insert_sql[] = "insert or ignore into sessions(id, data, ip_address, mdate) values(?,?,?, CURRENT_TIMESTAMP)";
static const char update_sql[] = "update sessions set data=?, mdate=CURRENT_TIMESTAMP where id=?";
static const char select_sql[] = "select data from sessions where id=? and ip_address=?";
//static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
int createSessionData()
{
session_data = malloc(data_size);
int bigdata_size = 0;
int unit_size = (sizeof(session_id)-1) + (sizeof(ip_address)-1) + (sizeof(data)-1) + 1;
while(1)
{
int new_size = bigdata_size + unit_size;
if(new_size < data_size)
{
memcpy(session_data+bigdata_size, session_id, sizeof(session_id));
bigdata_size += (sizeof(session_id)-1);
memcpy(session_data+bigdata_size, ip_address, sizeof(ip_address));
bigdata_size+= (sizeof(ip_address)-1);
memcpy(session_data+bigdata_size, data, sizeof(data));
bigdata_size+= (sizeof(data)-1);
session_data[bigdata_size] = '\0';
}
else break;
}
data_size = bigdata_size;
return data_size;
}
int prepareStmt(int pid, sqlite3 *db, sqlite3_stmt **stmt, const char *sql, int sql_size)
{
int rc = -1;
for(int busy_retry=0; busy_retry < busy_retry_count; ++busy_retry)
{
//when the database doesn't exist we have a race condition to create the tables
//so we wait here the winner to create then
rc = sqlite3_prepare_v2(db, sql, sql_size, stmt, NULL);
if(rc != SQLITE_OK)
{
usleep(busy_retry_useconds);
continue;
}
break;
}
if(rc != SQLITE_OK)
{
fprintf(stderr, "****Thread %d failed to prepare statements\n", pid);
return -1;
}
return rc;
}
#define BUSY_RETRY_OR_EXIT(cmd, check_code, msg) \
for(busy_retry=0; busy_retry < busy_retry_count; ++busy_retry) \
{ \
rc = cmd; \
if( (rc == SQLITE_BUSY) || (rc == SQLITE_LOCKED) ) \
{ \
usleep(busy_retry_useconds); \
continue; \
} \
break; \
} \
if(rc != check_code) \
{ \
fprintf(stderr, "****Thread %d error on %s = %d\n", pid, msg, rc); \
return (void*)-1; \
}
void *thread_runner(void *tp)
{
int pid = (int)tp; //pthread_self();
sqlite3 *db;
int rc = sqlite3_open( use_memory_db ? "file::memory:?cache=shared" : "sessions.db", &db);
if(rc == SQLITE_OK)
{
int busy_retry = 0;
sqlite3_stmt *stmt_insert, *stmt_update, *stmt_select;
if(!use_memory_db)
{
char buf[256];
snprintf(buf, sizeof(buf), "PRAGMA synchronous = %d;", use_synchronous);
BUSY_RETRY_OR_EXIT(sqlite3_exec(db, buf, NULL, NULL, NULL), SQLITE_OK, "PRAGMA synchronous");
if(use_wal) BUSY_RETRY_OR_EXIT(sqlite3_exec(db, "PRAGMA journal_mode = WAL;", NULL, NULL, NULL), SQLITE_OK, "PRAGMA journal_mode");
}
//pthread_mutex_lock( &mutex );
//create sessions table
BUSY_RETRY_OR_EXIT(sqlite3_exec(db, create_sql, NULL, NULL, NULL), SQLITE_OK, "sqlite3_exec");
//create prepared statements
if(prepareStmt(pid, db, &stmt_insert, insert_sql, sizeof(insert_sql)-1)) return (void*)-1;
if(prepareStmt(pid, db, &stmt_update, update_sql, sizeof(update_sql)-1)) return (void*)-1;
if(prepareStmt(pid, db, &stmt_select, select_sql, sizeof(select_sql)-1)) return (void*)-1;
//insert initial data
rc = sqlite3_bind_text(stmt_insert, 1, session_id, sizeof(session_id)-1, NULL);
rc = sqlite3_bind_text(stmt_insert, 2, data, sizeof(data)-1, NULL);
rc = sqlite3_bind_text(stmt_insert, 3, ip_address, sizeof(ip_address)-1, NULL);
BUSY_RETRY_OR_EXIT(sqlite3_step(stmt_insert), SQLITE_DONE, "initial data");
rc = sqlite3_reset(stmt_insert);
//pthread_mutex_unlock( &mutex );
//int count = 0;
//int failed = 0;
//start our busy long work
fprintf(stderr, "Thread %d start working %d\n", pid, rc);
while(keepRunning)
{
//printf("Managing session %d %d %d %d %d\n", pid, ++count, busy_retry, failed, rc);
//sleep(1);
if(start_loop_useconds) usleep(start_loop_useconds);
//one client arrived, let's get it's saved data
rc = sqlite3_bind_text(stmt_select, 1, session_id, sizeof(session_id)-1, NULL);
rc = sqlite3_bind_text(stmt_select, 2, ip_address, sizeof(ip_address)-1, NULL);
const unsigned char *saved_data = NULL;
for(busy_retry=0; busy_retry < busy_retry_count; ++busy_retry)
{
rc = sqlite3_step(stmt_select);
if( (rc == SQLITE_BUSY) || (rc == SQLITE_LOCKED) )
{
++read_busy_count[pid];
//sleep(1);
usleep(busy_retry_useconds);
continue;
}
break;
}
if(rc == SQLITE_ROW)
{
saved_data = sqlite3_column_text(stmt_select, 0);
}
else
{
sqlite3_reset(stmt_select);
++read_failed[pid];
continue;
}
rc = sqlite3_reset(stmt_select);
//sleep(1);
if(mid_loop_useconds) usleep(mid_loop_useconds);
//ok we served our client, let's save it's data
rc = sqlite3_bind_text(stmt_update, 1, saved_data ? session_data : data, saved_data ? data_size : -1, NULL);
rc = sqlite3_bind_text(stmt_update, 2, session_id, sizeof(session_id)-1, NULL);
for(busy_retry=0; busy_retry < busy_retry_count; ++busy_retry)
{
rc = sqlite3_step(stmt_update);
if( (rc == SQLITE_BUSY) || (rc == SQLITE_LOCKED) )
{
++update_busy_count[pid];
//sleep(1);
usleep(busy_retry_useconds);
continue;
}
break;
}
rc = sqlite3_reset(stmt_update);
switch(rc)
{
case SQLITE_LOCKED:
case SQLITE_BUSY:
++update_busy_failed[pid];
break;
case SQLITE_OK:
++update_count[pid];
break;
default:
fprintf(stderr, "update_busy_failed with error %d\n", rc);
}
}
sqlite3_finalize(stmt_insert);
sqlite3_finalize(stmt_update);
sqlite3_finalize(stmt_select);
sqlite3_close(db);
}
else
{
fprintf(stderr, "Thread %d failed to open the database\n", pid);
}
return tp;
}
typedef struct {
int total;
int total_update_busy_count;
int total_update_busy_failed;
int total_read_busy_count;
int total_read_failed;
int total_rate;
int total_rate_update_busy_count;
int total_rate_update_busy_failed;
int total_rate_read_busy_count;
int total_rate_read_failed;
} show_stats_st;
void calcShowStatsForThread(show_stats_st *tmp, int i)
{
tmp->total += update_count[i];
tmp->total_update_busy_count += update_busy_count[i];
tmp->total_update_busy_failed += update_busy_failed[i];
tmp->total_read_busy_count += read_busy_count[i];
tmp->total_read_failed += read_failed[i];
}
void calcShowStats(show_stats_st *last, show_stats_st *tmp)
{
for(int i=0; i < threads_count; ++i)
{
calcShowStatsForThread(tmp, i);
}
tmp->total_rate = (tmp->total - last->total);
tmp->total_rate_update_busy_count = (tmp->total_update_busy_count - last->total_update_busy_count);
tmp->total_rate_update_busy_failed = (tmp->total_update_busy_failed - last->total_update_busy_failed);
tmp->total_rate_read_busy_count = (tmp->total_read_busy_count - last->total_read_busy_count);
tmp->total_rate_read_failed = (tmp->total_read_failed - last->total_read_failed);
}
void showStatsToStdout(show_stats_st *last, show_stats_st *tmp, int count)
{
printf(
"\nUpdate rate: total=%d, last_total=%d, rate=%d, avg1=%d, avg2=%d\n"
"Update busy rate: total=%d, last_total=%d, rate=%d, avg1=%d, avg2=%d\n"
"Update failed rate: total=%d, last_total=%d, rate=%d, avg1=%d, avg2=%d\n"
"Read busy rate: total=%d, last_total=%d, rate=%d, avg1=%d, avg2=%d\n"
"Read failed rate: total=%d, last_total=%d, rate=%d, avg1=%d, avg2=%d\n",
tmp->total, last->total, tmp->total_rate, tmp->total_rate / threads_count, tmp->total / count,
tmp->total_update_busy_count, last->total_update_busy_count,
tmp->total_rate_update_busy_count, tmp->total_rate_update_busy_count / threads_count,
tmp->total_update_busy_count / count,
tmp->total_update_busy_failed, last->total_update_busy_failed,
tmp->total_rate_update_busy_failed, tmp->total_rate_update_busy_failed / threads_count,
tmp->total_update_busy_failed /count,
tmp->total_read_busy_count, last->total_read_busy_count,
tmp->total_rate_read_busy_count, tmp->total_rate_read_busy_count / threads_count,
tmp->total_read_busy_count / count,
tmp->total_read_failed, last->total_read_failed,
tmp->total_rate_read_failed, tmp->total_rate_read_failed / threads_count,
tmp->total_read_failed / count);
}
void *showStats(void *p)
{
int count = 0;
show_stats_st last;
memset(&last, 0, sizeof(show_stats_st));
while(keepRunning)
{
sleep(1);
++count;
show_stats_st tmp;
memset(&tmp, 0, sizeof(show_stats_st));
calcShowStats(&last, &tmp);
showStatsToStdout(&last, &tmp, count);
last.total = tmp.total;
last.total_update_busy_count = tmp.total_update_busy_count;
last.total_update_busy_failed = tmp.total_update_busy_failed;
last.total_read_busy_count = tmp.total_read_busy_count;
last.total_read_failed = tmp.total_read_failed;
}
show_stats_st empty;
memset(&empty, 0, sizeof(show_stats_st));
//On exit show per thread stats
for(int i=0; i < threads_count; ++i)
{
show_stats_st tmp;
memset(&tmp, 0, sizeof(show_stats_st));
calcShowStatsForThread(&tmp, i);
printf("\nOn exit individual stats for thread %d after %d counts\n", i, count);
showStatsToStdout(&empty, &tmp, count);
}
return p;
}
int main(int argc, const char *argv[])
{
//int pid = getpid();
data_size = MIN_DATA_SIZE;
if(argc > 1)
{
data_size = atoi(argv[1]);
}
if(data_size < MIN_DATA_SIZE) data_size = MIN_DATA_SIZE;
createSessionData();
threads_count = 10;
if(argc > 2)
{
threads_count = atoi(argv[2]);
}
if(threads_count < 1) threads_count = 1;
use_wal = 0;
if(argc > 3)
{
use_wal = atoi(argv[3]);
}
use_synchronous = 0;
if(argc > 4)
{
use_synchronous = atoi(argv[4]);
}
busy_retry_count = 10;
if(argc > 5)
{
busy_retry_count = atoi(argv[5]);
}
busy_retry_useconds = 250000;
if(argc > 6)
{
busy_retry_useconds = atoi(argv[6]);
}
start_loop_useconds = 1000;
if(argc > 7)
{
start_loop_useconds = atoi(argv[7]);
}
mid_loop_useconds = 1000;
if(argc > 8)
{
mid_loop_useconds = atoi(argv[8]);
}
use_memory_db = 0;
if(argc > 9)
{
use_memory_db = atoi(argv[9]);
}
signal(SIGINT, intHandler);
printf("Starting our busy session management ! %d\n", data_size);
pthread_attr_t attr;
int rc = pthread_attr_init(&attr);
if (rc != 0) handle_error_en(rc, "pthread_attr_init");
rc = pthread_attr_setscope(&attr, PTHREAD_SCOPE_SYSTEM);
if (rc != 0) handle_error_en(rc, "pthread_attr_setscope");
pthread_t thread_stats;
pthread_t *threads = malloc(sizeof(pthread_t) * threads_count);
int update_count_size = sizeof(int) * threads_count;
update_count = malloc(update_count_size);
memset(update_count, 0, update_count_size);
update_busy_count = malloc(update_count_size);
memset(update_busy_count, 0, update_count_size);
update_busy_failed = malloc(update_count_size);
memset(update_busy_failed, 0, update_count_size);
read_busy_count = malloc(update_count_size);
memset(read_busy_count, 0, update_count_size);
read_failed = malloc(update_count_size);
memset(read_failed, 0, update_count_size);
rc = pthread_create( &thread_stats, &attr, showStats, NULL);
if(rc)
{
fprintf(stderr,"Error - pthread_create() return code: %d\n", rc);
exit(EXIT_FAILURE);
}
for(int i=0; i < threads_count; ++i)
{
rc = pthread_create( &threads[i], &attr, thread_runner, (void*)i);
if(rc)
{
fprintf(stderr,"Error - pthread_create() %d return code: %d\n", i, rc);
exit(EXIT_FAILURE);
}
}
pthread_join(thread_stats, NULL);
for(int i=0; i < threads_count; ++i)
{
pthread_join( threads[i], NULL);
}
free(update_busy_count);
free(update_busy_failed);
free(update_count);
free(read_failed);
free(read_busy_count);
free(threads);
free(session_data);
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment