Skip to content

Instantly share code, notes, and snippets.

@brugeman
Created May 22, 2015 07:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save brugeman/30070d4c309a17f21597 to your computer and use it in GitHub Desktop.
Save brugeman/30070d4c309a17f21597 to your computer and use it in GitHub Desktop.
SMR pointer storage checked with relacy race detector.
#include <cstdio>
#include "relacy/relacy_std.hpp"
namespace smrp_test
{
// Single Writer Multiple Readers pointer storage,
// both writers and readers are lock-free,
// writers wait for all readers to finish
// with previous value so it can be deleted.
// Thus, write throughput is limited by slowest
// reader, and writes are meant to happen very rarely.
template<typename T, size_t READER_THREAD_NUM>
class swmr_pointer_store_t
{
static const size_t CACHE_LINE_SIZE = 64;
typedef std::atomic<T *> aptr_t;
// no-false-sharing atomic pointer
class nfs_aptr_t
{
public:
nfs_aptr_t () : p (0) {}
aptr_t p;
private:
nfs_aptr_t (const nfs_aptr_t & other);
nfs_aptr_t & operator= (const nfs_aptr_t & other);
char pad_[CACHE_LINE_SIZE - sizeof (aptr_t)];
};
// value owned by writer
nfs_aptr_t main_ptr_;
// values owned by readers
mutable nfs_aptr_t hazard_ptrs_[READER_THREAD_NUM];
// Implements wait strategy for writers in case
// the old value is used by readers.
void wait ()
{
// std::this_thread::yield ();
}
// no copies! moving is ok
swmr_pointer_store_t (const swmr_pointer_store_t & other);
swmr_pointer_store_t & operator= (const swmr_pointer_store_t && other);
public:
swmr_pointer_store_t ()
{}
// Must only be called by writer thread,
// wait-free, 1 atomic load.
T * get () const
{
return main_ptr_.p.load (rl::memory_order_relaxed);
}
// Writer thread calls this to replace previous value,
// the previous value is returned as soon as
// no readers are using it. That is, this will work as slow
// as the slowest reader works (and not faster than 'wait' above).
// This call's complexity is O(N) w/ N=READER_THREAD_NUM, and
// is intended to be used rarely.
T * set (T * ptr)
{
// replace old value with new value
rl::var<T *> old;
// seq_cst is required to make contents pointed by ptr to get
// visible to readers.
VAR(old) = main_ptr_.p.exchange (ptr, rl::memory_order_seq_cst);
// do we actually need to check readers?
if (VAR(old) != 0 && ptr != VAR(old))
{
// now make sure old value is not in use
bool locked = false;
do
{
rl::var<T *> v = 0;
for (size_t i = 0;
VAR(v) != VAR(old) && i < READER_THREAD_NUM;
++i)
VAR(v) = hazard_ptrs_[i].p.load (rl::memory_order_acquire);
// the old value is locked if it sits in one
// of reader threads' slots
locked = VAR(v) == VAR(old);
// some reader is using our value?
// ok, lets wait to let him unlock it.
if (locked)
wait ();
} while (locked);
}
// ok, no readers use old value now
return VAR(old);
}
// Readers use 'acquire' to get current value of pointer,
// and notify writer that the value is in use until 'release'.
// This call might loop while in contention with writer,
// but since writes are to happen rarely, looping is
// very unlikely.
T * acquire (const size_t thread_index) const
{
RL_ASSERT (thread_index < READER_THREAD_NUM);
rl::var<T *> v;
rl::var<T *> v1;
do
{
// load the pointer
VAR(v) = main_ptr_.p.load (rl::memory_order_acquire);
// save it to our thread's hazard pointer slot
hazard_ptrs_[thread_index].p
.store (VAR(v), rl::memory_order_release);
// now - check main pointer again, to make sure
// writer hasn't managed to perform 'set' btw previous 2 operations
// NOTE: why is seq_cst memory ordering required? Tests
// fail with other orderings, and my guess is that compiler
// memory model allows it to completely eliminate this
// v1 reading from same pointer as v and their comparison,
// as those are meaningless in single threaded code. Seq_cst
// forces compiler to avoid that optimizations.
VAR(v1) = main_ptr_.p.load (rl::memory_order_seq_cst);
// if writer didn't change anything,
// return our 'locked' value
}
while (VAR(v1) != VAR(v));
// we don't wait on retry as writer
// has already completed (or we wouldn't need to retry)
return VAR(v);
}
// Readers use 'release' to notify writers that the last
// read value is no longer in use. It is wait-free -
// 1 atomic store.
void release (const size_t thread_index) const
{
RL_ASSERT (thread_index < READER_THREAD_NUM);
// notify writer that we're done
hazard_ptrs_[thread_index].p.store (0, rl::memory_order_release);
}
};
static const size_t READERS_NUM = 1;
struct race_test : rl::test_suite<race_test, READERS_NUM + 1>
{
swmr_pointer_store_t<size_t, READERS_NUM> pointer;
// executed in single thread before main thread function
void before ()
{
// printf ("pointer.get () %p\n", pointer.get ());
// fflush (stdout);
RL_ASSERT (pointer.get () == 0);
}
// main thread function
void thread (const unsigned int thread_index)
{
if (0 == thread_index)
{
const size_t WRITES = 100;
rl::var<size_t *> value = 0;
for (size_t i = 0; i < WRITES; ++i)
{
VAR(value) = new size_t;
*VAR(value) = i;
// printf ("%lu new value %p %lu\n",
// i, (size_t *)VAR(value), *VAR(value));
// fflush (stdout);
rl::var<size_t *> old_value;
VAR(old_value) = pointer.set (VAR(value));
// printf ("%lu old value %p %lu\n",
// i, (size_t *)VAR(old_value),
// VAR(old_value) ? *VAR(old_value) : 0);
// fflush (stdout);
delete VAR(old_value);
}
delete pointer.set (0);
}
else
{
const size_t reader_index = thread_index - 1;
RL_ASSERT (reader_index < thread_index);
const size_t READS = 100;
rl::var<size_t> last_value = 0;
for (size_t i = 0; i < READS; ++i)
{
rl::var<size_t *> p;
VAR(p) = pointer.acquire (reader_index);
// printf ("%lu %lu %p %lu %lu\n",
// reader_index, i, (size_t *)VAR(p), VAR(p) ? *VAR(p) : 0,
// (size_t)VAR(last_value));
// fflush (stdout);
if (VAR(p))
{
// we must read non-decreasing sequence
RL_ASSERT (*VAR(p) >= VAR(last_value));
VAR(last_value) = *VAR(p);
}
pointer.release (reader_index);
}
}
}
// executed in single thread after main thread function
void after()
{
RL_ASSERT (pointer.get () == 0);
}
// executed in single thread after every 'visible' action in main threads
// disallowed to modify any state
void invariant()
{
}
};
void test ()
{
rl::simulate<race_test> ();
}
}; /* namespace smrp_test */
int
main ()
{
smrp_test::test ();
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment