Skip to content

Instantly share code, notes, and snippets.

@pkhuong pkhuong/hp.c Secret

Last active Jul 7, 2020
Embed
What would you like to do?
Hazard pointer microbenchmarks
/*
* We wish to compare the overhead of various hazard pointer
* implementations on hazard pointer heavy workloads. We expect such
* workloads to look like pointer chasing: otherwise, we could
* probably do something smarter about SMR on the read side.
*
* That's why this microbenchmark shuffles a singly linked circular
* list in an area of memory that fits comfortably in L1, and measures
* the time it takes to traverse that list.
*
* We make sure the list fits in L1 because while memory latency can
* hide a lot of sins, we don't want our results to only hold when
* every acces misses cache.
*
* I wouldn't use the read-side implementations below in production
* code. I was more worried about getting the assembly code I
* expected than actually making sure all the language-level
* correctness requirements are satisfied.
*
* I built this with `gcc-9 -falign-functions=256 -march=native -mtune=native -O3 -W -Wall`.
*/
#include <stdatomic.h>
#include <stdbool.h>
#include <stddef.h>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/random.h>
#define LIKELY(X) __builtin_expect(!!(X), 1)
#define UNLIKELY(X) __builtin_expect(!!(X), 0)
#define NOINLINE __attribute__((__noinline__))
#define NUM_NODES (16384UL / sizeof(struct node))
/**
* See https://www.intel.com/content/dam/www/public/us/en/documents/white-papers/ia-32-ia-64-benchmark-code-execution-paper.pdf
*/
static inline uint64_t
cycles_begin(void)
{
uint32_t lo, hi;
asm volatile("cpuid\n\t"
"rdtsc"
: "=a"(lo), "=d"(hi) :: "%rbx", "rcx", "memory");
return ((uint64_t)hi << 32) + lo;
}
static inline uint64_t
cycles_end(void)
{
uint32_t lo, hi;
asm volatile("rdtscp\n\t"
"mov %%eax, %0\n\t"
"mov %%edx, %1\n\t"
"cpuid"
: "=r"(lo), "=r"(hi) :: "rax", "%rbx", "%rcx", "rdx", "memory");
return ((uint64_t)hi << 32) + lo;
}
struct node {
struct node *next;
uint64_t value;
};
static struct node nodes[NUM_NODES];
/**
* Shuffles a list of indices into `nodes` to construct a circular
* linked list in that array.
*/
static NOINLINE void
setup_nodes(unsigned int seed)
{
size_t indices[NUM_NODES];
for (size_t i = 0; i < NUM_NODES; i++)
indices[i] = i;
if (seed == 0)
getentropy(&seed, sizeof(seed));
srandom(seed);
/* Fisher-Yates shuffle. */
for (size_t i = 0; i < NUM_NODES; i++) {
size_t range = NUM_NODES - i;
size_t j, tmp;
j = i + (1.0 * range / RAND_MAX) * random();
tmp = indices[i];
indices[i] = indices[j];
indices[j] = tmp;
}
/*
* The indices array has the elements of the circular list in
* order, where each index points in the nodes array. Set up
* linkage, and write something into the value field.
*/
for (size_t i = 0; i < NUM_NODES; i++) {
struct node *cur = &nodes[indices[i]];
size_t next = indices[(i + 1) % NUM_NODES];
cur->value = i;
cur->next = &nodes[next];
}
/* Make sure the optimizer can't do anything weird here. */
asm("" : "+m"(nodes) :: "memory");
return;
}
static NOINLINE uint64_t
traverse_noop(size_t num_hops, size_t start)
{
(void)num_hops;
(void)start;
return 0;
}
static inline void *
frob_ptr(void *ptr, uint64_t value)
{
#if 0
(void)value;
return ptr;
#else
uintptr_t y = 0;
asm("" : "+r"(y));
return (char *)ptr + (value * y);
#endif
}
/**
* Traverses the linked list in `nodes` up to num_hops times, stopping
* whenever the `next` pointer is NULL (never actually happens).
*/
static NOINLINE uint64_t
traverse_baseline(size_t num_hops, size_t start)
{
struct node *head = &nodes[start];
uint64_t acc = 0;
for (size_t i = 0; i < num_hops; i++) {
uint64_t value;
if (head == NULL)
break;
value = head->value;
acc ^= value;
head = head->next;
head = frob_ptr(head, value);
}
return acc;
}
/**
* Hazard pointer-ed implementations will tend to unroll the loop
* twice for hand-over-hand locking. That could have a positive
* impact, so control for that.
*/
static NOINLINE uint64_t
traverse_unrolled(size_t num_hops, size_t start)
{
struct node *head = &nodes[start];
uint64_t acc = 0;
for (size_t i = 0; i < num_hops; i++) {
uint64_t value;
if (head == NULL)
break;
value = head->value;
acc ^= value;
head = head->next;
head = frob_ptr(head, value);
if (head == NULL || ++i >= num_hops)
break;
value = head->value;
acc ^= value;
head = head->next;
head = frob_ptr(head, value);
}
return acc;
}
/**
* Classic barrier-ed hazard pointer.
*/
struct hp_record {
void *volatile pin;
struct {
void **volatile cell;
union {
/* Generation counters are negative */
intptr_t pin_or_gen;
/* cell is positive. */
intptr_t cell_or_pin;
};
} help;
};
static inline void *
hp_read_explicit(struct hp_record *record, void **cell)
{
void *snapshot = *cell;
for (;;) {
void *copy = snapshot;
void *actual;
/*
* Use an atomic exchange to publish with a store-load
* fence. C11 atomic_exchange results in weird codegen.
*/
asm("xchg %0, %1" : "+r"(copy), "+m"(record->pin) :: "memory");
actual = *cell;
if (LIKELY(actual == snapshot))
return actual;
snapshot = actual;
}
}
static inline __attribute__((__always_inline__, __flatten__)) uint64_t
traverse_hp_generic(void *(*hp_read)(struct hp_record *, void **),
size_t num_hops, size_t start)
{
static struct hp_record backing[2];
struct hp_record *records = backing;
struct node *head = &nodes[start];
uint64_t acc = 0;
/* Let's pretend we published these records. */
asm volatile("" : "+r"(records) :: "memory");
for (size_t i = 0; i < num_hops; i++) {
uint64_t value;
if (head == NULL)
break;
value = head->value;
acc ^= value;
head = hp_read(&records[0], (void **)&head->next);
head = frob_ptr(head, value);
if (head == NULL || ++i >= num_hops)
break;
value = head->value;
acc ^= value;
head = hp_read(&records[1], (void **)&head->next);
head = frob_ptr(head, value);
}
return acc;
}
static NOINLINE uint64_t
traverse_explicit(size_t num_hops, size_t start)
{
return traverse_hp_generic(hp_read_explicit, num_hops, start);
}
static inline void *
hp_read_membarrier(struct hp_record *record, void **volatile cell)
{
for (;;) {
void *snapshot = *cell;
record->pin = snapshot;
/*
* We only need a compiler barrier: the slow path uses
* an asymmetric barrier.
*/
asm("" ::: "memory");
/*
* This load should be an acquire load, but I'm
* reading the asm.
*/
if (LIKELY(*cell == snapshot))
return snapshot;
}
}
static NOINLINE uint64_t
traverse_membarrier(size_t num_hops, size_t start)
{
return traverse_hp_generic(hp_read_membarrier, num_hops, start);
}
static inline void *
hp_read_movs(struct hp_record *record, void **cell)
{
void *volatile *protected = &record->pin;
const void *cell_copy = cell;
asm("movsq" : "+S"(cell_copy), "+D"(protected));
protected--;
return *protected;
}
static NOINLINE uint64_t
traverse_movs(size_t num_hops, size_t start)
{
return traverse_hp_generic(hp_read_movs, num_hops, start);
}
static inline void *
hp_read_movs_spec(struct hp_record *record, void **cell)
{
void *volatile *protected = &record->pin;
const void *cell_copy = cell;
void *ret = *cell;
bool eql;
asm("movsq" : "+S"(cell_copy), "+D"(protected) :: "memory");
/*
* Use inline asm to make the comparison opaque, we actually
* want to create a control dependency, since those are
* predicted, unlike data dependencies.
*/
asm("cmpq %1, %2" : "=@cce"(eql) : "r"(ret), "m"(protected[-1]));
if (LIKELY(eql))
return ret;
/*
* We want redundant EAs for protected - 1, since this slow
* path should be extremely unlikely.
*/
asm("" : "+r"(protected));
return protected[-1];
}
static NOINLINE uint64_t
traverse_movs_spec(size_t num_hops, size_t start)
{
return traverse_hp_generic(hp_read_movs_spec, num_hops, start);
}
static NOINLINE void *
hp_read_movs_spec_call(struct hp_record *record, void **cell)
{
void *volatile *protected = &record->pin;
const void *cell_copy = cell;
void *ret = *cell;
bool eql;
asm("movsq" : "+S"(cell_copy), "+D"(protected) :: "memory");
/*
* Use inline asm to make the comparison opaque, we actually
* want to create a control dependency, since those are
* predicted, unlike data dependencies.
*/
asm("cmpq %1, %2" : "=@cce"(eql) : "r"(ret), "m"(protected[-1]));
if (LIKELY(eql))
return ret;
/*
* We want redundant EAs for protected - 1, since this slow
* path should be extremely unlikely.
*/
asm("" : "+r"(protected));
return protected[-1];
}
static NOINLINE uint64_t
traverse_movs_spec_call(size_t num_hops, size_t start)
{
return traverse_hp_generic(hp_read_movs_spec_call, num_hops, start);
}
static inline void *
hp_read_wf(struct hp_record *record, void **cell)
{
/* This counter should be associated with a (set of) records. */
static intptr_t gen_alloc = INTPTR_MIN;
intptr_t gen = ++gen_alloc;
void *read;
record->help.cell = cell;
/*
* Once our store to pin_or_gen is visible, target must also
* be visible.
*
* atomic_store messes with the codegen, and we only need a
* compiler barrier on x86.
*/
asm("" ::: "memory");
record->help.pin_or_gen = gen;
asm("" ::: "memory"); /* matches first membarrier */
read = *cell;
record->pin = read;
asm("" ::: "memory"); /* matches second membarrier */
/*
* If we can observe that someone helped us forward, we must
* use that helped value: we might otherwise resurrect a
* pointer that's been considered dead.
*/
if (UNLIKELY(record->help.pin_or_gen != gen)) {
asm("":::"memory");
/* Force a branch + read for the unlikely path. */
read = (void *)record->help.pin_or_gen;
}
return read;
}
__attribute__((__used__)) void *
hp_read_wf_extern(struct hp_record *record, void **cell)
{
return hp_read_wf(record, cell);
}
static NOINLINE uint64_t
traverse_wf(size_t num_hops, size_t start)
{
return traverse_hp_generic(hp_read_wf, num_hops, start);
}
static inline void *
hp_read_swf(struct hp_record *record, void **cell)
{
void *read;
record->help.cell_or_pin = (intptr_t)cell;
asm("" ::: "memory"); /* matches first membarrier */
read = *cell;
record->pin = read;
asm("" ::: "memory"); /* matches second membarrier */
/*
* If we can observe that someone helped us forward, we must
* use that helped value: we might otherwise resurrect a
* pointer that's been considered dead.
*/
if (UNLIKELY(
record->help.cell_or_pin != (intptr_t)cell)) {
asm("":::"memory");
/* Force a branch + read for the unlikely path. */
read = (void *)-record->help.cell_or_pin;
}
return read;
}
__attribute__((__used__)) void *
hp_read_swf_extern(struct hp_record *record, void **cell)
{
return hp_read_swf(record, cell);
}
static NOINLINE uint64_t
traverse_swf(size_t num_hops, size_t start)
{
return traverse_hp_generic(hp_read_swf, num_hops, start);
}
#define NUM_REP (1000 * 1000)
static double timings[NUM_REP];
static int
cmp_double(const void *vx, const void *vy)
{
const double *x = vx;
const double *y = vy;
if (*x == *y)
return 0;
return (*x < *y) ? -1 : 1;
}
static NOINLINE void
timeit(const char *name, uint64_t (*fn)(size_t, size_t), size_t num_hops)
{
for (size_t i = 0; i < NUM_REP; i++) {
uint64_t begin, end;
uint64_t ret;
begin = cycles_begin();
ret = fn(num_hops, i % NUM_NODES);
asm volatile("" : "+r"(ret) :: "memory");
end = cycles_end();
timings[i] = end - begin;
}
if (name == NULL)
return;
for (size_t i = 0; i < NUM_REP; i++)
printf("%s\t%zu\t%f\n", name, num_hops, timings[i]);
qsort(timings, NUM_REP, sizeof(timings[0]), cmp_double);
fprintf(stderr, "%s\t%zu\t%f\t%f\t%f\n",
name, num_hops,
timings[NUM_REP / 1000],
timings[NUM_REP / 2],
timings[NUM_REP - NUM_REP / 1000]);
return;
}
int
main()
{
size_t num_hops = 1000;
setup_nodes(0);
for (size_t i = 0; i < 2; i++) {
#define T(suffix) timeit((i == 0) ? NULL : #suffix, traverse_##suffix, num_hops)
T(noop);
T(baseline);
T(unrolled);
T(explicit);
T(membarrier);
T(movs);
T(movs_spec);
T(movs_spec_call);
T(wf);
T(swf);
#undef T
}
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.