Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@pkhuong
Last active June 6, 2017 18:53
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save pkhuong/a622e031e92f7fdfb1df1b49a7627d54 to your computer and use it in GitHub Desktop.
Save pkhuong/a622e031e92f7fdfb1df1b49a7627d54 to your computer and use it in GitHub Desktop.
Relaxed revocable locks
#define _GNU_SOURCE
#define RLOCK_SIGNAL_HANDLER
#include "rlock.h"
#include "rlock_process_status.h"
#include <assert.h>
#include <errno.h>
#include <pthread.h>
#include <signal.h>
#include <stddef.h>
#include <stdlib.h>
#include <sys/syscall.h>
#include <ucontext.h>
#include <unistd.h>
#include <ck_pr.h>
#include <ck_stack.h>
#define OP_LIMIT (1UL << 16)
#define LIKELY(X) __builtin_expect(!!(X), 1)
#define UNLIKELY(X) __builtin_expect(!!(X), 0)
union rlock_owner_tid {
uint64_t bits;
struct {
int32_t tid;
uint32_t timestamp;
};
};
struct rlock_owner {
struct ck_stack_entry freelist_entry;
union rlock_owner_tid tid;
rlock_owner_seq_t seq;
/* MPMC: Asked to cancel up to here (inclusively). */
uint32_t cancel_sequence;
/* MPMC: Signaled to cancel up to here (inclusively). */
uint32_t signal_sequence;
/* SPMC: Read cancel ask up to here (inclusively). */
uint32_t acked_sequence;
uint32_t op_count;
struct rlock *critical_section;
struct rlock_owner *next;
} __attribute__((__aligned__(64)));
struct rlock_store {
uint64_t begin;
uint64_t end;
};
extern struct rlock_store __start_rlock_store_list[];
extern struct rlock_store __stop_rlock_store_list[];
static pthread_once_t init_control = PTHREAD_ONCE_INIT;
static pthread_key_t rlock_self_key;
static struct rlock_owner *owners = NULL;
static __thread struct rlock_owner *rlock_self = NULL;
static struct ck_stack owner_freelist __attribute__((__aligned__(16)));
#ifdef RLOCK_STATS
uint64_t rlock_evict = 0;
uint64_t rlock_evict_slow = 0;
uint64_t rlock_evict_fail = 0;
uint64_t rlock_evicted = 0;
uint64_t rlock_evicted_in_crit = 0;
#endif
static inline bool
mod_lte(uint32_t x, uint32_t y)
{
return ((y - x) % (1UL << 22)) < (1UL << 21);
}
static pid_t
gettid(void)
{
long r;
r = syscall(SYS_gettid);
assert(r >= 0);
return (pid_t)r;
}
static int
tgkill(int tgid, int tid, int sig)
{
int r;
r = syscall(SYS_tgkill, tgid, tid, sig);
assert(r == 0 || errno == ESRCH);
return r;
}
static uint32_t
self_timestamp(void)
{
/* XXX: return actual thread ts. */
return 0;
}
static void
rlock_key_recycle(void *value)
{
int r;
assert(value == rlock_self);
ck_stack_push_mpmc(&owner_freelist, &rlock_self->freelist_entry);
rlock_self = NULL;
r = pthread_setspecific(rlock_self_key, NULL);
assert(r == 0);
return;
}
static void
init_rlock(void)
{
struct sigaction action = {
.sa_sigaction = rlock_signal_handler,
.sa_flags = SA_SIGINFO
};
int r;
r = sigaction(RLOCK_SIGNAL, &action, &action);
assert(r == 0);
assert(action.sa_handler == SIG_DFL || action.sa_handler == SIG_IGN);
r = pthread_key_create(&rlock_self_key, rlock_key_recycle);
return;
}
static struct rlock_owner *
allocate_self(void)
{
struct rlock_owner *ret;
int r;
r = pthread_once(&init_control, init_rlock);
assert(r == 0);
ret = (void *)ck_stack_pop_mpmc(&owner_freelist);
if (ret != NULL) {
ck_pr_fas_32(&ret->cancel_sequence, ret->seq.sequence);
} else {
struct rlock_owner *tos;
void *mem;
r = posix_memalign(&mem, 64, sizeof(struct rlock_owner));
assert(r == 0);
ret = mem;
*ret = (struct rlock_owner) {};
ret->seq.address = (uintptr_t)mem / 64;
ret->seq.sequence = 1;
tos = ck_pr_load_ptr(&owners);
for (;;) {
ret->next = tos;
ck_pr_fence_store();
if (ck_pr_cas_ptr_value(&owners, tos, ret, &tos)) {
break;
}
}
}
ret->tid.tid = gettid();
ret->tid.timestamp = self_timestamp();
ret->critical_section = NULL;
rlock_self = ret;
r = pthread_setspecific(rlock_self_key, ret);
assert(r == 0);
return ret;
}
static inline bool
update_self(struct rlock_owner *self)
{
rlock_owner_seq_t snapshot = { .bits = self->seq.bits };
uint32_t cancel_sequence = ck_pr_load_32(&self->cancel_sequence);
if (LIKELY(self->seq.sequence != cancel_sequence)) {
return false;
}
ck_pr_fas_32(&self->cancel_sequence, snapshot.sequence);
ck_pr_fas_32(&self->signal_sequence, snapshot.sequence);
ck_pr_fas_32(&self->acked_sequence, snapshot.sequence);
snapshot.sequence++;
ck_pr_fas_64(&self->seq.bits, snapshot.bits);
return true;
}
static inline struct rlock_owner *
get_self(void)
{
struct rlock_owner *self;
self = rlock_self;
if (UNLIKELY(self == NULL)) {
self = allocate_self();
}
update_self(self);
return self;
}
static bool
rlock_owner_stale(rlock_owner_seq_t owner)
{
struct rlock_owner *victim = (void *)((uintptr_t)owner.address * 64);
rlock_owner_seq_t snapshot;
if (victim == NULL) {
return true;
}
snapshot.bits = ck_pr_load_64(&victim->seq.bits);
return (snapshot.bits != owner.bits);
}
static bool
ensure_cancel_sequence(struct rlock_owner *victim, uint32_t sequence)
{
uint32_t actual;
uint32_t previous = (sequence - 1) % (1UL << 22);
ck_pr_cas_32_value(&victim->cancel_sequence, previous, sequence,
&actual);
return (actual == previous || actual == sequence);
}
static void
ensure_signal_sequence(struct rlock_owner *victim, uint32_t sequence)
{
union rlock_owner_tid snapshot;
uint32_t current;
snapshot.bits = ck_pr_load_64(&victim->tid.bits);
current = ck_pr_load_32(&victim->signal_sequence);
if (snapshot.bits == 0 || mod_lte(sequence, current)) {
return;
}
tgkill(getpid(), snapshot.tid, RLOCK_SIGNAL);
ck_pr_cas_32(&victim->signal_sequence, current, sequence);
return;
}
static bool
victim_running(struct rlock_owner *victim)
{
union rlock_owner_tid tid;
int status;
tid.bits = ck_pr_load_64(&victim->tid.bits);
if (tid.tid == 0) {
return false;
}
status = rlock_process_running(tid.tid, tid.timestamp);
/* no matching tid. */
if (status < 0) {
ck_pr_cas_64(&victim->tid.bits, tid.bits, 0);
return false;
}
return (status != 0);
}
void
rlock_owner_signal(union rlock_owner_seq owner)
{
struct rlock_owner *victim = (void *)((uintptr_t)owner.address * 64);
rlock_owner_seq_t snapshot;
uint32_t acked;
uint32_t sequence = owner.sequence;
if (victim == NULL) {
return;
}
snapshot.bits = ck_pr_load_64(&victim->seq.bits);
if (snapshot.bits != owner.bits) {
return;
}
acked = ck_pr_load_32(&victim->acked_sequence);
if (mod_lte(sequence, acked)) {
return;
}
ensure_cancel_sequence(victim, sequence);
return;
}
bool
rlock_owner_cancel(union rlock_owner_seq owner,
struct rlock *evict)
{
struct rlock_owner *victim = (void *)((uintptr_t)owner.address * 64);
rlock_owner_seq_t snapshot;
uint32_t acked;
uint32_t sequence = owner.sequence;
assert(evict != NULL);
/* Easy case: no owner. */
if (victim == NULL) {
return true;
}
snapshot.bits = ck_pr_load_64(&victim->seq.bits);
if (snapshot.bits != owner.bits) {
/* The victim has already moved on to a new sequence value. */
return true;
}
acked = ck_pr_load_32(&victim->acked_sequence);
if (mod_lte(sequence, acked)) {
/* We already have acked cancellation >= sequence. */
return true;
}
#ifdef RLOCK_STATS
ck_pr_inc_64(&rlock_evict);
#endif
/* Advance the victim's cancel counter to sequence. */
if (!ensure_cancel_sequence(victim, sequence)) {
/* Already advanced; nothing to do! */
return true;
}
if (victim_running(victim)) {
/* The victim isn't obviously scheduled out. */
/* See if we must ensure visibility of our cancel. */
snapshot.bits = ck_pr_load_64(&victim->seq.bits);
if (snapshot.bits == owner.bits) {
ensure_signal_sequence(victim, sequence);
}
#ifdef RLOCK_STATS
ck_pr_inc_64(&rlock_evict_fail);
#endif
return false;
}
if (ck_pr_load_ptr(&victim->critical_section) != evict) {
/*
* Easy case: victim isn't in a critical section with
* our lock. The victim has either been scheduled out
* since we called `ensure_cancel_sequence`, our went
* through a context switch at least once. In either
* case, it has already observed the cancellation or
* will before the next critical section.
*/
return true;
}
#ifdef RLOCK_STATS
ck_pr_inc_64(&rlock_evict_slow);
#endif
/*
* The victim might be in the middle of a critical section.
* Send a signal that'll skip the critical section if
* necessary.
*/
ensure_signal_sequence(victim, sequence);
/*
* If the victim is definitely not running, it either has
* already executed the signal handler or will before resuming
* normal execution. If the victim might be running,
* we can only hope we got lucky.
*/
if (!victim_running(victim)) {
return true;
}
/*
* We know the vitim was scheduled out before we signaled for
* cancellation. We can see if the victim has released our
* critical section at least once since then.
*/
return (ck_pr_load_ptr(&victim->critical_section) != evict);
}
void
rlock_owner_release(void)
{
struct rlock_owner *self = rlock_self;
rlock_owner_seq_t seq;
if (self == NULL) {
return;
}
seq.bits = self->seq.bits;
ck_pr_fas_32(&self->cancel_sequence, seq.sequence);
ck_pr_fas_32(&self->signal_sequence, seq.sequence);
ck_pr_fas_32(&self->acked_sequence, seq.sequence);
ck_pr_fence_store();
seq.sequence++;
ck_pr_fas_64(&self->seq.bits, seq.bits);
return;
}
rlock_owner_seq_t
rlock_lock(struct rlock *lock)
{
struct rlock_owner *self = get_self();
rlock_owner_seq_t seq, snapshot;
/* Load the current owner. */
snapshot.bits = ck_pr_load_64(&lock->owner.bits);
/* Easy case: we already own the lock. */
if (snapshot.bits == self->seq.bits) {
return self->seq;
}
for (;;) {
seq.bits = self->seq.bits;
/* Make sure the current owner isn't anymore. */
if (!rlock_owner_cancel(snapshot, lock)) {
/*
* Couldn't cancel the old owner. Either it
* was replaced while we were trying to cancel
* it, or it's still there.
*
* If ownership was just replaced, we can
* assume the current owner is still running.
* If the owner is the same, there's nothing
* we can do.
*
* In either case, we should try a different
* lock.
*/
seq.bits = 0;
return seq;
}
/* Replace the old owner with ourself. */
if (ck_pr_cas_64_value(&lock->owner.bits,
snapshot.bits, seq.bits, &snapshot.bits)) {
/* Success! */
break;
}
/* CAS failed. snapshot.bits has the new owner. */
/* Eagerly observe any cancellation. */
update_self(self);
/* CAS failed. Spin a bit. */
ck_pr_stall();
}
return seq;
}
rlock_owner_seq_t
rlock_current(void)
{
struct rlock_owner *self = get_self();
return self->seq;
}
rlock_owner_seq_t
rlock_trylock(struct rlock *lock)
{
struct rlock_owner *self = get_self();
rlock_owner_seq_t seq = { .bits = self->seq.bits };
rlock_owner_seq_t snapshot;
snapshot.bits = ck_pr_load_64(&lock->owner.bits);
if (rlock_owner_stale(snapshot) &&
ck_pr_cas_64(&lock->owner.bits, snapshot.bits, seq.bits)) {
return seq;
}
seq.bits = 0;
return seq;
}
bool
rlock_test(rlock_owner_seq_t snapshot, struct rlock *lock)
{
struct rlock_owner *self = (void *)((uintptr_t)snapshot.address * 64);
if (UNLIKELY(self == NULL ||
ck_pr_load_32(&self->cancel_sequence) == self->seq.sequence)) {
self = get_self();
}
return (self->seq.bits == snapshot.bits &&
ck_pr_load_64(&lock->owner.bits) == snapshot.bits);
}
bool
rlock_store_64(rlock_owner_seq_t snapshot,
struct rlock *lock, uint64_t *dst, uint64_t value)
{
struct rlock_owner *self = (void *)((uintptr_t)snapshot.address * 64);
rlock_owner_seq_t seq;
uint32_t op_count;
int status;
seq.bits = self->seq.bits;
op_count = ++self->op_count;
/* We cancelled this lock. */
if (UNLIKELY(seq.bits != snapshot.bits)) {
#ifdef RLOCK_STATS
ck_pr_inc_64(&rlock_evicted);
#endif
return false;
}
/* The handler will reset RAX to 1 on skip. */
status = 1;
asm volatile(
/* Move the lock's address in the critical section flag. */
"0: movq %[lock], %[critical_section]\n\t"
/* Do we still own the lock? */
"cmpq %[owner], %[snapshot]\n\t"
"jne 1f\n\t"
/* Were we asked to cancel? */
"cmpl %[cancelled], %[seq]\n\t"
"je 1f\n\t"
/* Success path! Set status to 0. */
"xorl %[status], %[status]\n\t"
#ifdef RLOCK_STATS
"pause\n\t"
"pause\n\t"
"pause\n\t"
"pause\n\t"
"pause\n\t"
"pause\n\t"
"pause\n\t"
"pause\n\t"
"pause\n\t"
"pause\n\t"
#endif
/* Store the value in *dst. */
"movq %[value], %[dst]\n\t"
/* End of critical section. */
"1:\n\t"
/*
* Make sure the signal handler knows where the
* critical section code begins & ends.
*/
".pushsection rlock_store_list, \"a\", @progbits\n\t"
".quad 0b, 1b\n\t"
".popsection\n\t"
: [status] "+a"(status),
[critical_section] "+m"(self->critical_section),
[dst] "=m"(*dst)
: [lock] "r"(lock),
[snapshot] "r"(snapshot.bits),
[owner] "m"(lock->owner.bits),
[seq] "r"((uint32_t)seq.sequence),
[cancelled] "m"(self->cancel_sequence),
[value] "r"(value)
: "memory", "cc");
/* Clear the flag. */
ck_pr_store_ptr(&self->critical_section, NULL);
/* Acknowledge any cancellation request. */
if (UNLIKELY(status != 0)) {
update_self(self);
#ifdef RLOCK_STATS
ck_pr_inc_64(&rlock_evicted);
#endif
return false;
}
/* Force lock reacquisition after a couple thousand writes. */
if (UNLIKELY(op_count >= OP_LIMIT)) {
#ifdef RLOCK_STATS
ck_pr_dec_64(&rlock_evicted);
#endif
self->op_count = 0;
rlock_owner_release();
}
return true;
}
void
rlock_signal_handler(int signal, siginfo_t *info, void *arg)
{
ucontext_t *ctx = arg;
mcontext_t *mctx = &ctx->uc_mcontext;
struct rlock_owner *self = rlock_self;
uintptr_t rip;
size_t nloc = __stop_rlock_store_list - __start_rlock_store_list;
(void)signal;
(void)info;
rip = (uintptr_t)mctx->gregs[REG_RIP];
for (size_t i = 0; i < nloc; i++) {
struct rlock_store record;
record = __start_rlock_store_list[i];
if (rip < record.begin || rip >= record.end) {
continue;
}
assert(self != NULL);
#ifdef RLOCK_STATS
ck_pr_inc_64(&rlock_evicted_in_crit);
#endif
/* skip the critical instruction. */
mctx->gregs[REG_RIP] = record.end;
/* set the interrupted flag. */
mctx->gregs[REG_RAX] = 1;
return;
}
/* Might as well publish that we observed any cancellation request. */
if (self != NULL) {
ck_pr_fas_32(&self->acked_sequence,
ck_pr_load_32(&self->cancel_sequence));
}
return;
}
#ifndef RLOCK_H
#define RLOCK_H
#ifdef RLOCK_SIGNAL_HANDLER
# include <signal.h>
#endif
#include <stdbool.h>
#include <stdint.h>
#ifdef RLOCK_SIGNAL_HANDLER
# define RLOCK_SIGNAL SIGUSR1
#endif
typedef union rlock_owner_seq {
uint64_t bits;
struct {
uint64_t sequence:22;
uint64_t address:42;
};
} rlock_owner_seq_t;
struct rlock {
rlock_owner_seq_t owner;
struct rlock *next;
};
/* Get the owner to eventually release this sequence. */
void rlock_owner_signal(rlock_owner_seq_t);
/* Make the owner release this sequence & lock; return true on success. */
bool rlock_owner_cancel(rlock_owner_seq_t, struct rlock *evict);
/* Release the caller's sequence. */
void rlock_owner_release(void);
/*
* _seq.address == 0 on failure.
*/
rlock_owner_seq_t rlock_lock(struct rlock *);
rlock_owner_seq_t rlock_trylock(struct rlock *);
rlock_owner_seq_t rlock_current(void);
/*
* Return whether the lock is still owned by snapshot.
*/
bool rlock_test(rlock_owner_seq_t snapshot, struct rlock *lock);
/*
* *dst = value iff the caller still owns the rlock.
*/
bool rlock_store_64(rlock_owner_seq_t snapshot,
struct rlock *lock, uint64_t *dst, uint64_t value);
#ifdef RLOCK_SIGNAL_HANDLER
void rlock_signal_handler(int, siginfo_t *, void *);
#endif
#endif /* !RLOCK_H */
#define _GNU_SOURCE
#include "rlock_process_status.h"
#include <assert.h>
#include <errno.h>
#include <fcntl.h>
#include <sched.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
/* We're looking for a line of length < 128. */
#define LINE_MAX_LENGTH 128
static bool
sched_debug_find_tid(char *buf, pid_t tid)
{
const char needle[] = ".curr->pid";
char *cursor = buf;
/* look for r'^ *\.curr->pid *: [0-9]+' */
while ((cursor = strstr(cursor, needle)) != NULL) {
char *right_cursor;
/*
* Make sure it's all spaces to the left of cursor
* until we hit a newline.
*/
if (cursor == buf) {
goto next;
}
for (const char *left_cursor = cursor - 1;; left_cursor--) {
char curr = *left_cursor;
if (curr == '\n') {
break;
}
if (curr != ' ' || left_cursor == buf) {
goto next;
}
}
right_cursor = strchr(cursor, '\n');
if (right_cursor == NULL) {
goto next;
}
{
int r;
long long actual;
*right_cursor = '\0';
r = sscanf(cursor, " .curr->pid : %lld", &actual);
*right_cursor = '\n';
if (r == 1 && actual == tid) {
return true;
}
}
next:
cursor += strlen(needle);
}
return false;
}
static int
sched_debug_read(int fd, pid_t tid)
{
char buf[8192];
const ssize_t wanted = sizeof(buf) - 1;
size_t prefix = 0;
for (;;) {
size_t bufsize;
ssize_t r;
r = read(fd, &buf[prefix], wanted - prefix);
assert(r >= 0);
if (r == 0) {
break;
}
bufsize = (size_t)r + prefix;
buf[bufsize] = '\0';
if (sched_debug_find_tid(buf, tid)) {
return 1;
}
if (bufsize > LINE_MAX_LENGTH) {
memmove(&buf[0], &buf[bufsize - LINE_MAX_LENGTH],
LINE_MAX_LENGTH);
prefix = LINE_MAX_LENGTH;
} else {
prefix = bufsize;
}
}
return 0;
}
/*
* Return -1 if unknown, 0 if not running, 1 if maybe running.
*/
static int
sched_debug_parse(pid_t tid)
{
int fd;
int ret;
fd = open("/proc/sched_debug", O_RDONLY);
if (fd < 0) {
return -1;
}
ret = sched_debug_read(fd, tid);
close(fd);
return ret;
}
/*
* 0 if not running, 1 if maybe running.
*/
static int
stat_parse(int task_fd)
{
char buf[4096];
ssize_t r;
int own_processor, processor;
char status;
own_processor = sched_getcpu();
r = read(task_fd, buf, sizeof(buf) - 1);
/* If we can't read, the task must be fone. */
if (r <= 0) {
return 0;
}
buf[r] = '\0';
r = sscanf(buf,
"%*d " /* pid */
"%*s " /* comm */
"%c " /* status */
"%*d " /* ppid */
"%*d " /* pgrp */
"%*d " /* session */
"%*d " /* tty */
"%*d " /* tpgid */
"%*u " /* flags */
"%*u " /* minflt */
"%*u " /* cminflt */
"%*u " /* majflt */
"%*u " /* cmajflt */
"%*u " /* utime */
"%*u " /* stime */
"%*d " /* cutime */
"%*d " /* cstime */
"%*d " /* priority */
"%*d " /* nice */
"%*d " /* num_threads */
"%*d " /* itrealvalue */
"%*u " /* XXX start time %llu */
"%*u " /* vsize */
"%*d " /* rss */
"%*u " /* rsslim */
"%*u " /* startcode */
"%*u " /* endcode */
"%*u " /* startstack */
"%*u " /* kstkesp */
"%*u " /* kstkeip */
"%*u " /* signal */
"%*u " /* blocked */
"%*u " /* sigignore */
"%*u " /* sigcatch */
"%*u " /* wchan */
"%*u " /* nswap */
"%*u " /* cnswap */
"%*d " /* exit signal */
"%d " /* last processor */
"",
&status, &processor);
assert(r == 2);
/* Not runnable -> not running. */
if (status != 'R') {
return 0;
}
/* On the same processor as us -> pre-empted at least once. */
if (processor == own_processor && own_processor == sched_getcpu()) {
return 0;
}
return 1;
}
int
rlock_process_running(pid_t tid, uint64_t creation_ts)
{
char path[1024];
ssize_t r;
int task_fd;
int ret;
(void)creation_ts;
{
r = snprintf(path, sizeof(path),
"/proc/self/task/%i/stat", tid);
assert(r >= 0);
task_fd = open(path, O_RDONLY);
if (task_fd < 0) {
assert(errno == ENOENT);
return -1;
}
}
ret = stat_parse(task_fd);
/*
* sched_debug is really slow (sad that we only need a few
* lines out of it). Use it as a last resort.
*/
if (ret > 0 && sched_debug_parse(tid) == 0) {
ret = 0;
}
close(task_fd);
return ret;
}
#ifndef RLOCK_PROCESS_STATUS_H
#define RLOCK_PROCESS_STATUS_H
#include <stdbool.h>
#include <stdint.h>
#include <sys/types.h>
/*
* -1: tid/creation_ts pair does not exist anymore.
* 0: definitely was a moment when tid was not running.
* > 0: maybe running.
*/
int rlock_process_running(pid_t tid, uint64_t creation_ts);
#endif /* !RLOCK_PROCESS_STATUS_H */
#define _GNU_SOURCE
#include <assert.h>
#include <inttypes.h>
#include <math.h>
#include <pthread.h>
#include <sched.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <ck_pr.h>
#include "rlock.h"
#define MAX_THREADS 512
#define SECTION_COUNT 24
struct section {
struct rlock lock;
uint64_t counter;
} __attribute__((__aligned__(64)));
struct section sections[SECTION_COUNT];
static size_t pause_iter = 0;
#ifdef RLOCK_STATS
static size_t increments = 1000 * 1000 * 1000UL;
#else
static size_t increments = 100000 * 1000 * 1000UL;
#endif
static void
pause(size_t n)
{
for (size_t i = 0; i < n; i++) {
ck_pr_stall();
}
return;
}
static void *
increment(void *arg)
{
size_t completed = 0;
size_t failures = 0;
(void)arg;
while (completed < increments) {
struct section *dst;
union rlock_owner_seq snapshot = { .bits = 0 };
size_t cpu = sched_getcpu();
size_t i;
for (size_t i = 0; i < SECTION_COUNT; i++) {
dst = &sections[(i + cpu) % SECTION_COUNT];
snapshot = rlock_lock(&dst->lock);
if (snapshot.address != 0) {
break;
}
}
if (snapshot.address == 0) {
ck_pr_stall();
printf("stall\n");
continue;
}
for (i = 0; i < 1000000 && completed < increments; i++) {
uint64_t counter = ck_pr_load_64(&dst->counter);
#ifdef RLOCK_STATS
pause(pause_iter);
#else
(void)pause;
#endif
if (!rlock_store_64(snapshot, &dst->lock,
&dst->counter, counter + 1)) {
failures++;
break;
}
completed++;
}
rlock_owner_release();
}
//printf("Failed: %f\n", 1.0 * failures / (failures + completed));
return NULL;
}
static void *
increment_base(void *arg)
{
size_t completed = 0;
(void)arg;
while (completed < increments) {
struct section *dst = &sections[0];
ck_pr_fas_64(&dst->lock.owner.bits, 1);
for (size_t i = 0; i < 10000 && completed < increments; i++) {
uint64_t counter = ck_pr_load_64(&dst->counter);
ck_pr_store_64(&dst->counter, counter + 1);
completed++;
}
ck_pr_store_64(&dst->lock.owner.bits, 0);
}
//printf("Failed: %f\n", 1.0 * failures / (failures + completed));
return NULL;
}
static void *
increment_locked(void *arg)
{
size_t completed = 0;
(void)arg;
while (completed < increments) {
struct section *dst = &sections[0];
for (size_t i = 0; i < 10000 && completed < increments; i++) {
uint64_t counter;
while (ck_pr_fas_64(&dst->lock.owner.bits, 1) != 0) {
ck_pr_stall();
}
counter = ck_pr_load_64(&dst->counter);
ck_pr_store_64(&dst->counter, counter + 1);
ck_pr_cas_64(&dst->lock.owner.bits, 1, 0);
completed++;
}
}
//printf("Failed: %f\n", 1.0 * failures / (failures + completed));
return NULL;
}
extern uint64_t rlock_evict, rlock_evict_slow, rlock_evict_fail;
extern uint64_t rlock_evicted, rlock_evicted_in_crit;
int
main(int argc, char **argv)
{
pthread_t workers[MAX_THREADS];
ssize_t thread_count = 2;
void *(*worker)(void *) = increment;
if (argc > 1) {
double work_ratio = atof(argv[1]);
/* work = 10 pause. */
/* want 10 / (10 + pause_iter) ~= work_ratio. */
/* pause_iter = 10 / work_ratio - 10 */
assert(work_ratio > 0 && work_ratio <= 1);
pause_iter = round((10 / work_ratio) - 10);
}
if (argc > 2) {
thread_count = atoi(argv[2]);
assert(thread_count >= -1);
assert(thread_count <= MAX_THREADS);
}
if (thread_count == 0) {
worker = increment_base;
thread_count = 1;
}
if (thread_count == -1) {
worker = increment_locked;
thread_count = 1;
}
assert(thread_count > 0);
increments /= (size_t)thread_count;
for (size_t i = 0; i < (size_t)thread_count; i++) {
int r;
r = pthread_create(&workers[i], NULL, worker, NULL);
assert(r == 0);
}
for (size_t i = 0; i < (size_t)thread_count; i++) {
int r;
r = pthread_join(workers[i], NULL);
assert(r == 0);
}
{
uint64_t sum = 0;
for (size_t i = 0; i < SECTION_COUNT; i++) {
printf("%s%" PRIu64"",
(i == 0) ? "" : " + ", sections[i].counter);
sum += sections[i].counter;
}
printf("; diff %"PRIu64"\n",
sum - (thread_count * increments));
}
#ifdef RLOCK_STATS
printf("%" PRIu64 ": %" PRIu64 " & %" PRIu64 "\n",
rlock_evict, rlock_evict_slow, rlock_evict_fail);
printf("%" PRIu64 ": %" PRIu64"\n",
rlock_evicted, rlock_evicted_in_crit);
#endif
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment