Skip to content

Instantly share code, notes, and snippets.

@mgerdts
Last active November 22, 2022 16:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mgerdts/fa61ce1780c0cb75ec22b9f4c20a6cef to your computer and use it in GitHub Desktop.
Save mgerdts/fa61ce1780c0cb75ec22b9f4c20a6cef to your computer and use it in GitHub Desktop.

This originally appeared in this slack message.

I've implemented your latest suggestions, which I think will work out ok. I'll push to gerrit shortly. I also hacked up a version to compare mutex and spinlock performance. With each thread taking the lock 50,000 times, the mutex takes 4% longer (non-debug, no asan, no ubsan) when the threads get fair access to the lock.

Starting test contend_spin
  Worker    Delay  Wait us  Hold us Total us
       0        3   251971   153829   405801
       1        5   151200   253713   404914
PASS test contend_spin
Starting test contend_mutex
  Worker    Delay  Wait us  Hold us Total us
       0        3   263877   157678   421556
       1        5   164256   259500   423757
PASS test contend_mutex

Sometimes with the mutex, the access is not fair and one thread finishes much more quickly than the other. In this case, one mutex thread finishes in 67% of the time seen with a spinlock, and the other thread takes about 10% longer than a spinlock.

Starting test contend_spin
  Worker    Delay  Wait us  Hold us Total us
       0        3   251866   153823   405690
       1        5   152406   253608   406014
PASS test contend_spin
Starting test contend_mutex
FAIL: spdk_lock.c:230 contend_end g_contend_data[0].wait_time > g_contend_data[1].wait_time
  Worker    Delay  Wait us  Hold us Total us
       0        3    90497   182958   273456
       1        5   190311   254881   445193
FAIL test contend_mutex (1 failed assertions)
Starting test hold_by_poller

I also did a test with errorcheck mutexes and I couldn't see a consistent penalty for errorcheck mutexes compared to normal mutexes.

/* SPDX-License-Identifier: BSD-3-Clause
* Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
*/
#include "spdk/stdinc.h"
#include "spdk/env.h"
#include "spdk/event.h"
#include "spdk/string.h"
#include "spdk/thread.h"
#include "spdk/util.h"
#include "spdk_internal/thread.h"
/*
* Used by multiple tests
*/
typedef void (*test_setup_fn)(void);
struct test {
const char *name;
uint32_t thread_count;
test_setup_fn setup_fn;
spdk_poller_fn end_fn;
struct spdk_poller *poller;
};
#define ASSERT(cond) do { \
if (cond) { \
g_pass++; \
} else { \
g_fail++; \
printf("FAIL: %s:%d %s %s\n", __FILE__, __LINE__, __func__, #cond); \
} \
} while (0);
#define WORKER_COUNT 2
static uint32_t g_pass;
static uint32_t g_fail;
struct spdk_spinlock g_spinlock;
static struct spdk_thread *g_thread[WORKER_COUNT];
/* Main test logic runs on this thread. */
struct spdk_thread *g_default_thread;
/* Protects g_lock_error_count during updates by spdk_spin_abort_fn(). */
pthread_mutex_t g_lock_lock = PTHREAD_MUTEX_INITIALIZER;
uint32_t g_lock_error_count[SPIN_ERR_LAST];
static void launch_next_test(void *arg);
static bool
check_spin_err_count(enum spin_error *expect)
{
enum spin_error i;
bool ret = true;
for (i = SPIN_ERR_NONE; i < SPIN_ERR_LAST; i++) {
if (g_lock_error_count[i] != expect[i]) {
printf("FAIL: %s: Error %d expected %u, got %u\n", __func__, i,
expect[i], g_lock_error_count[i]);
ret = false;
}
}
return ret;
}
/* A spdk_spin_abort_fn() implementation */
static void
do_not_abort(enum spin_error error)
{
struct spdk_thread *thread = spdk_get_thread();
uint32_t i;
/*
* Only count on threads for the current test. Those from a previous test may continue to
* rack up errors in their death throes. A real application will abort() or exit() on the
* first error.
*/
for (i = 0; i < SPDK_COUNTOF(g_thread); i++) {
if (g_thread[i] != thread) {
continue;
}
ASSERT(error >= SPIN_ERR_NONE && error < SPIN_ERR_LAST);
if (error >= SPIN_ERR_NONE && error < SPIN_ERR_LAST) {
pthread_mutex_lock(&g_lock_lock);
g_lock_error_count[error]++;
pthread_mutex_unlock(&g_lock_lock);
}
}
}
/*
* contend - make sure that two concurrent threads can take turns at getting the lock
*/
struct contend_worker_data {
struct spdk_poller *poller;
uint64_t wait_time;
uint64_t hold_time;
uint32_t increments;
uint32_t delay_us;
uint32_t bit;
};
static uint32_t g_contend_remaining;;
static uint32_t g_get_lock_times = 50000;
static struct contend_worker_data g_contend_data[WORKER_COUNT] = {
{ .bit = 0, .delay_us = 3 },
{ .bit = 1, .delay_us = 5 },
};
static inline uint64_t
timediff(struct timespec *ts0, struct timespec *ts1)
{
return (ts1->tv_sec - ts0->tv_sec) * SPDK_SEC_TO_NSEC + ts1->tv_nsec - ts0->tv_nsec;
}
static uint32_t g_contend_word;
static pthread_mutex_t g_contend_mutex = PTHREAD_MUTEX_INITIALIZER;
static bool g_contend_use_spinlock;
static int
contend_worker_fn(void *arg)
{
struct contend_worker_data *data = arg;
struct timespec ts0, ts1;
const uint32_t mask = 1 << data->bit;
clock_gettime(CLOCK_MONOTONIC, &ts0);
if (g_contend_use_spinlock) {
spdk_spin_lock(&g_spinlock);
} else {
pthread_mutex_lock(&g_contend_mutex);
}
clock_gettime(CLOCK_MONOTONIC, &ts1);
data->wait_time += timediff(&ts0, &ts1);
if (data->increments == g_get_lock_times) {
assert(data->poller != NULL);
g_contend_remaining--;
spdk_poller_unregister(&data->poller);
assert(data->poller == NULL);
} else {
switch (data->increments & 0x1) {
case 0:
ASSERT((g_contend_word & mask) == 0);
g_contend_word |= mask;
break;
case 1:
ASSERT((g_contend_word & mask) == mask);
g_contend_word ^= mask;
break;
default:
abort();
}
data->increments++;
spdk_delay_us(data->delay_us);
}
if (g_contend_use_spinlock) {
spdk_spin_unlock(&g_spinlock);
} else {
pthread_mutex_unlock(&g_contend_mutex);
}
clock_gettime(CLOCK_MONOTONIC, &ts0);
data->hold_time += timediff(&ts1, &ts0);
return SPDK_POLLER_BUSY;
}
static void
contend_start_worker_poller(void *ctx)
{
struct contend_worker_data *data = ctx;
data->hold_time = 0;
data->wait_time = 0;
data->increments = 0;
data->poller = SPDK_POLLER_REGISTER(contend_worker_fn, data, 0);
if (data->poller == NULL) {
fprintf(stderr, "Failed to start poller\n");
abort();
}
}
static void
contend_setup_spin(void)
{
uint32_t i;
g_contend_use_spinlock = true;
memset(&g_spinlock, 0, sizeof(g_spinlock));
spdk_spin_init(&g_spinlock);
g_contend_remaining = SPDK_COUNTOF(g_contend_data);
/* Add a poller to each thread */
for (i = 0; i < SPDK_COUNTOF(g_contend_data); i++) {
spdk_thread_send_msg(g_thread[i], contend_start_worker_poller, &g_contend_data[i]);
}
}
static void
contend_setup_mutex(void)
{
uint32_t i;
g_contend_use_spinlock = false;
pthread_mutex_init(&g_contend_mutex, NULL);
g_contend_remaining = SPDK_COUNTOF(g_contend_data);
/* Add a poller to each thread */
for (i = 0; i < SPDK_COUNTOF(g_contend_data); i++) {
spdk_thread_send_msg(g_thread[i], contend_start_worker_poller, &g_contend_data[i]);
}
}
static void
contend_setup_mutex_ec(void)
{
uint32_t i;
pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_ERRORCHECK);
pthread_mutex_init(&g_contend_mutex, &attr);
pthread_mutexattr_destroy(&attr);
g_contend_use_spinlock = false;
g_contend_remaining = SPDK_COUNTOF(g_contend_data);
/* Add a poller to each thread */
for (i = 0; i < SPDK_COUNTOF(g_contend_data); i++) {
spdk_thread_send_msg(g_thread[i], contend_start_worker_poller, &g_contend_data[i]);
}
}
static int
contend_end(void *arg)
{
struct test *test = arg;
enum spin_error expect[SPIN_ERR_LAST] = { 0 };
uint32_t i;
if (g_contend_remaining != 0) {
return SPDK_POLLER_IDLE;
}
ASSERT(check_spin_err_count(expect));
ASSERT(g_contend_data[0].wait_time > g_contend_data[1].wait_time);
ASSERT(g_contend_data[0].increments == g_contend_data[1].increments);
printf("%8s %8s %8s %8s %8s\n", "Worker", "Delay", "Wait us", "Hold us", "Total us");
for (i = 0; i < SPDK_COUNTOF(g_contend_data); i++) {
printf("%8" PRIu32 " %8" PRIu32 " %8" PRIu64 " %8" PRIu64 " %8" PRIu64 "\n",
i, g_contend_data[i].delay_us,
g_contend_data[i].wait_time / 1000, g_contend_data[i].hold_time / 1000,
(g_contend_data[i].wait_time + g_contend_data[i].hold_time) / 1000);
}
spdk_poller_unregister(&test->poller);
spdk_thread_send_msg(g_default_thread, launch_next_test, NULL);
return SPDK_POLLER_BUSY;
}
/*
* hold_by_poller - a lock held by a poller when it returns trips an assert
*/
struct spdk_poller *g_poller;
static int
hold_by_poller(void *arg)
{
static int times_called = 0;
enum spin_error expect[SPIN_ERR_LAST] = { 0 };
/*
* This polller will be called twice, each time trying to take the spinlock.
*/
switch (times_called) {
case 0:
ASSERT(check_spin_err_count(expect));
break;
case 1:
expect[SPIN_ERR_HOLD_DURING_SWITCH] = 1;
ASSERT(check_spin_err_count(expect));
break;
default:
abort();
}
spdk_spin_lock(&g_spinlock);
memset(expect, 0, sizeof(expect));
switch (times_called) {
case 0:
ASSERT(check_spin_err_count(expect));
break;
case 1:
expect[SPIN_ERR_DEADLOCK] = 1;
expect[SPIN_ERR_HOLD_DURING_SWITCH] = 1;
ASSERT(check_spin_err_count(expect));
spdk_poller_unregister(&g_poller);
break;
default:
abort();
}
times_called++;
return SPDK_POLLER_BUSY;
}
static void
hold_by_poller_start(void *arg)
{
memset(g_lock_error_count, 0, sizeof(g_lock_error_count));
memset(&g_spinlock, 0, sizeof(g_spinlock));
spdk_spin_init(&g_spinlock);
g_poller = spdk_poller_register(hold_by_poller, NULL, 0);
}
static void
hold_by_poller_setup(void)
{
spdk_thread_send_msg(g_thread[0], hold_by_poller_start, NULL);
}
static int
hold_by_poller_end(void *arg)
{
struct test *test = arg;
enum spin_error expect [SPIN_ERR_LAST] = { 0 };
/* Wait for hold_by_poller() to complete its work. */
if (g_poller != NULL) {
return SPDK_POLLER_IDLE;
}
/* Some final checks to be sure all the expected errors were seen */
expect[SPIN_ERR_DEADLOCK] = 1;
expect[SPIN_ERR_HOLD_DURING_SWITCH] = 2;
ASSERT(check_spin_err_count(expect));
/* All done, move on to next test */
spdk_poller_unregister(&test->poller);
spdk_thread_send_msg(g_default_thread, launch_next_test, NULL);
return SPDK_POLLER_BUSY;
}
/*
* hold_by_message - A message sent to a thread retains the lock when it returns.
*/
static bool g_hold_by_message_done;
static void
hold_by_message(void *ctx)
{
spdk_spin_lock(&g_spinlock);
g_hold_by_message_done = true;
}
static void
hold_by_message_setup(void)
{
memset(g_lock_error_count, 0, sizeof(g_lock_error_count));
memset(&g_spinlock, 0, sizeof(g_spinlock));
spdk_spin_init(&g_spinlock);
spdk_thread_send_msg(g_thread[0], hold_by_message, NULL);
}
static int
hold_by_message_end(void *arg)
{
struct test *test = arg;
enum spin_error expect [SPIN_ERR_LAST] = { 0 };
/* Wait for the message to be processed */
if (!g_hold_by_message_done) {
return SPDK_POLLER_IDLE;
}
/* Verify an error was seen */
expect[SPIN_ERR_HOLD_DURING_SWITCH] = 1;
ASSERT(check_spin_err_count(expect));
/* All done, move on to next test */
spdk_poller_unregister(&test->poller);
spdk_thread_send_msg(g_default_thread, launch_next_test, NULL);
return SPDK_POLLER_BUSY;
}
/*
* Test definitions
*/
static void
start_threads(uint32_t count)
{
struct spdk_cpuset *cpuset;
uint32_t i;
cpuset = spdk_cpuset_alloc();
if (cpuset == NULL) {
fprintf(stderr, "failed to allocate cpuset\n");
abort();
}
assert(count <= SPDK_COUNTOF(g_thread));
for (i = 0; i < count; i++) {
spdk_cpuset_zero(cpuset);
spdk_cpuset_set_cpu(cpuset, i, true);
g_thread[i] = spdk_thread_create("worker", cpuset);
if (g_thread[i] == NULL) {
fprintf(stderr, "failed to create thread\n");
abort();
}
}
spdk_cpuset_free(cpuset);
}
static void
stop_thread(void *arg)
{
struct spdk_thread *thread = arg;
spdk_thread_exit(thread);
}
static void
stop_threads(void)
{
uint32_t i;
for (i = 0; i < SPDK_COUNTOF(g_thread); i++) {
if (g_thread[i] == NULL) {
break;
}
spdk_thread_send_msg(g_thread[i], stop_thread, g_thread[i]);
g_thread[i] = NULL;
}
}
static uint32_t g_current_test;
static struct test g_tests[] = {
{"contend_spin", 2, contend_setup_spin, contend_end},
{"contend_mutex", 2, contend_setup_mutex, contend_end},
{"contend_ec_mutex", 2, contend_setup_mutex_ec, contend_end},
{"hold_by_poller", 1, hold_by_poller_setup, hold_by_poller_end},
{"hold_by_message", 1, hold_by_message_setup, hold_by_message_end},
};
static void
launch_next_test(void *arg)
{
struct test *test;
static uint32_t last_fail_count = 0;
assert(spdk_get_thread() == g_default_thread);
if (g_current_test != 0) {
const char *name = g_tests[g_current_test - 1].name;
if (g_fail == last_fail_count) {
printf("PASS test %s\n", name);
} else {
printf("FAIL test %s (%u failed assertions)\n", name,
g_fail - last_fail_count);
}
stop_threads();
}
if (g_current_test == SPDK_COUNTOF(g_tests)) {
spdk_app_stop(g_fail);
return;
}
test = &g_tests[g_current_test];
printf("Starting test %s\n", test->name);
start_threads(test->thread_count);
test->setup_fn();
test->poller = SPDK_POLLER_REGISTER(test->end_fn, test, 1000);
g_current_test++;
}
static void
start_tests(void *arg)
{
spdk_spin_abort_fn = do_not_abort;
g_default_thread = spdk_thread_create("default", NULL);
spdk_thread_send_msg(g_default_thread, launch_next_test, NULL);
}
int
main(int argc, char **argv)
{
struct spdk_app_opts opts;
char *me = argv[0];
int ret;
spdk_app_opts_init(&opts, sizeof(opts));
opts.name = "poller_perf";
opts.reactor_mask = "0x7";
spdk_app_start(&opts, start_tests, NULL);
spdk_app_fini();
printf("%s summary:\n", me);
printf(" %8u assertions passed\n", g_pass);
printf(" %8u assertions failed\n", g_fail);
if (g_pass + g_fail == 0) {
ret = 1;
} else {
ret = spdk_min(g_fail, 127);
}
return ret;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment