Skip to content

Instantly share code, notes, and snippets.

@TurkeyMan TurkeyMan/guarded.d
Last active Oct 13, 2019

Embed
What would you like to do?
guarded.d
struct ReadLease(T)
{
~this() { releaseReadLease(this); }
const(T)* get() @property return const pure nothrow @safe @nogc { return object; }
alias get this;
private:
const(T)* object;
shared(ptrdiff_t)* leaseCounter;
}
struct WriteLease(T)
{
~this() { releaseWriteLease(this); }
T* get() @property return pure nothrow @safe @nogc { return object; }
alias get this;
private:
T* object;
shared(ptrdiff_t)* leaseCounter;
}
struct Guarded(T, bool reentrantLock = false)
{
nothrow @safe @nogc:
this(Args...)(auto ref Args args)
{
import core.lifetime : forward;
object = T(forward!args);
}
~this()
{
// check there are no outstanding leases and destruct `object`
// TODO: if `object` is trivially destructible, then we can use relaxed instead of acquire
assert(leaseCounter.atomicLoad!(MemoryOrder.acq) == 0, "Guarded has dangling leases");
// confirm that `object` can destruct; perhaps we need to cast away shared and `object.destroy()`?
//...
}
alias object this;
ref shared(inout(T)) unguarded() inout @property return pure { return object; }
ref shared(inout(T)) unguarded() shared inout @property return pure { return object; }
// TODO: read lease should be issuable for const object, except `leaseCounter` needs to be mutated! (classic `mutable`)
ReadLease!T read(int spinCount = 4096)() { return acquireReadLease!(true, reentrantLock, spinCount)(object, leaseCounter); }
ReadLease!T read(int spinCount = 4096)() shared { return acquireReadLease!(true, reentrantLock, spinCount)(object, leaseCounter); }
WriteLease!T write(int spinCount = 4096)() { return acquireWriteLease!(true, reentrantLock, spinCount)(object, leaseCounter); }
WriteLease!T write(int spinCount = 4096)() shared { return acquireWriteLease!(true, reentrantLock, spinCount)(object, leaseCounter); }
ReadLease!T tryRead() { return acquireReadLease!(false, reentrantLock)(object, leaseCounter); }
ReadLease!T tryRead() shared { return acquireReadLease!(false, reentrantLock)(object, leaseCounter); }
WriteLease!T tryWrite() { return acquireWriteLease!(false, reentrantLock)(object, leaseCounter); }
WriteLease!T tryWrite() shared { return acquireWriteLease!(false, reentrantLock)(object, leaseCounter); }
private:
shared(T) object;
shared(ptrdiff_t) leaseCounter;
}
unittest
{
import core.internal.traits : AliasSeq;
{
Guarded!(int, false) g;
// static assert(!__traits(compiles, *g.unguarded == 10)); // can't read (because shared)
// static assert(!__traits(compiles, *g.unguarded = 10)); // can't write (because shared)
{
auto write = g.write();
*write = 100;
assert(*write == 100);
assert(!g.tryWrite());
assert(g.tryRead() is null);
}
{
auto read = g.read();
assert(read is cast(int*)&g.unguarded());
assert(!g.tryWrite());
auto read2 = g.read();
assert(read2 is read);
assert(*read2 == 100 && *read2 == *read);
}
}
{
Guarded!(int, true) g;
// static assert(!__traits(compiles, *g.unguarded == 10)); // can't read (because shared)
// static assert(!__traits(compiles, *g.unguarded = 10)); // can't write (because shared)
{
auto write = g.write();
*write = 100;
assert(*write == 100);
assert(g.tryWrite());
assert(g.write() !is null);
assert(g.tryRead() !is null);
assert(g.read());
}
{
auto read = g.read();
assert(read is cast(int*)&g.unguarded());
assert(!g.tryWrite());
auto read2 = g.read();
assert(read2 is read);
assert(*read2 == 100 && *read2 == *read);
}
}
}
struct SharedPtr(T, bool reentrantLock = false)
{
nothrow @safe @nogc:
this(U)(ref SharedPtr!(U, reentrantLock) rh)
if (is(U* : T*))
{
if (!rh.payload)
return;
atomicOp!"+="(rh.payload.refCount, 1);
payload = rh.payload;
}
~this()
{
if (!payload)
return;
if (atomicLoad!(MemoryOrder.raw)(payload.refCount) == 0 || atomicOp!"-="(payload.refCount, 1) == 0)
{
// destroy payload...
// what allocator to use?
}
}
inout(shared(T))* get() inout @property return pure { return payload ? &payload.object.object : null; }
inout(shared(T))* get() shared inout @property return pure { return payload ? &payload.object.object : null; }
ReadLease!T read() { assert(payload); return payload.object.read(); }
ReadLease!T read() shared { assert(payload); return payload.object.read(); }
WriteLease!T write() { assert(payload); return payload.object.write(); }
WriteLease!T write() shared { assert(payload); return payload.object.write(); }
ReadLease!T tryRead() { return payload ? payload.object.tryRead() : ReadLease!T(); }
ReadLease!T tryRead() shared { return payload ? payload.object.tryRead() : ReadLease!T(); }
WriteLease!T tryWrite() { return payload ? payload.object.tryWrite() : WriteLease!T(); }
WriteLease!T tryWrite() shared { return payload ? payload.object.tryWrite() : WriteLease!T(); }
private:
struct Payload
{
this(Args...)(auto ref Args args)
{
import core.lifetime : forward;
object = T(forward!args);
}
Guarded!T object;
shared(ptrdiff_t) refCount;
}
shared(Payload)* payload;
this(Payload* payload) @trusted
{
this.payload = cast(shared)payload;
}
}
SharedPtr!(T, reentrantLock) makeShared(T, bool reentrantLock = false, Args...)(auto ref Args args)
{
import core.lifetime : forward;
alias SharedPtrTy = SharedPtr!(T, reentrantLock);
return SharedPtrTy(new SharedPtrTy.Payload(forward!args));
}
unittest
{
auto sp = makeShared!int(10);
assert(*sp.read == 10);
*sp.write = 100;
assert(*sp.read == 100);
}
private:
ReadLease!T acquireReadLease(bool wait, bool reentrantLock, uint spinCount = 0, T)(ref shared(const(T)) object, ref shared(ptrdiff_t) leaseCounter) nothrow @trusted @nogc
{
static if (wait)
alias compareExchange = cas; // casWeak!(acquire, relaxed)
else
alias compareExchange = cas; // cas!(acquire, relaxed)
static if (spinCount)
uint spinCounter = spinCount; // TODO: it would be nice to lazy-init this
start_over:
// optimistic test
ptrdiff_t count = 0;
if (compareExchange(&leaseCounter, &count, ptrdiff_t(-1)))
return ReadLease!T(cast(T*)&object, &leaseCounter);
if (count > 0)
{
static if (reentrantLock)
{
if (count == getThreadId())
return ReadLease!T(cast(const(T)*)&object, null);
}
static if (wait)
{
import core.time : Duration;
wait_for_write_release:
static if (spinCount)
{
if (spinCounter == 0)
{
// might want to implement a better backoff
// might want to yield first
// might want to sleep(1) to guarantee a wait period
Thread.sleep(Duration.zero);
}
else
{
--spinCounter;
pause();
}
}
else
{
// course wait defers to OS
// might want to yield first
// might want to sleep(1) to guarantee a wait period
Thread.sleep(Duration.zero);
}
goto start_over;
}
}
static if (!wait)
return ReadLease!T();
else
{
// read lease is already issued; we want to add a ref...
while (cas(&leaseCounter, &count, count - 1) == false)
{
// if all read leases were released and a write lease was re-issued while we weren't looking...
if (count > 0)
goto wait_for_write_release;
}
return ReadLease!T(cast(const(T)*)&object, &leaseCounter);
}
}
void releaseReadLease(T)(ref ReadLease!T lease) pure nothrow @safe @nogc
{
lease.object = null;
if (lease.leaseCounter)
{
// TODO: should be relaxed!
atomicOp!("+=")(*lease.leaseCounter, 1);
lease.leaseCounter = null;
}
}
WriteLease!T acquireWriteLease(bool wait, bool reentrantLock, uint spinCount = 0, T)(ref shared(T) object, ref shared(ptrdiff_t) leaseCounter) nothrow @trusted @nogc
{
static if (reentrantLock)
ptrdiff_t tid = getThreadId();
else
ptrdiff_t tid = 1;
static if (wait)
alias compareExchange = cas; // casWeak!(acquire, relaxed)
else
alias compareExchange = cas; // cas!(acquire, relaxed)
static if (reentrantLock)
{
// optimistic test
ptrdiff_t count = 0;
if (compareExchange(&leaseCounter, &count, tid))
return WriteLease!T(cast(T*)&object, &leaseCounter);
// check for lease re-entry
if (count == tid)
return WriteLease!T(cast(T*)&object, null);
}
else
{
// TODO: remove cast
if (compareExchange(&leaseCounter, ptrdiff_t(0), tid))
return WriteLease!T(cast(T*)&object, &leaseCounter);
}
static if (!wait)
return WriteLease!T();
else
{
import core.time : Duration;
static if (spinCount)
{
// wait using spin-loop with lame backoff
uint spinCounter = spinCount;
// TODO: casWeak!(acquire, relaxed), remove size_t cast
while (cas(&leaseCounter, ptrdiff_t(0), tid) == false)
{
if (spinCounter == 0)
{
// might want to implement a better backoff
// might want to yield first
// might want to sleep(1) to guarantee a wait period
Thread.sleep(Duration.zero);
}
else
--spinCounter;
pause();
}
}
else
{
// course wait defers to OS
// might want to yield first
// might want to sleep(1) to guarantee a wait period
// TODO: casWeak!(acquire, relaxed), remove size_t cast
while (cas(&leaseCounter, ptrdiff_t(0), tid) == false)
Thread.sleep(Duration.zero);
}
return WriteLease!T(cast(T*)&object, &leaseCounter);
}
}
void releaseWriteLease(T)(ref WriteLease!T lease) pure nothrow @safe @nogc
{
lease.object = null;
if (lease.leaseCounter)
{
atomicStore!(MemoryOrder.rel)(*lease.leaseCounter, ptrdiff_t(0));
lease.leaseCounter = null;
}
}
ptrdiff_t getThreadId() nothrow @safe @nogc
{
static ptrdiff_t tid = 0;
if (tid == 0)
{
ThreadID id = void;
try
id = Thread.getThis().id; // TODO: investigate why this would throw?!
catch (Exception)
assert(false, "Couldn't get thread id!");
// NOTE: it is absolutely possible to support other platforms thread id specs
assert(id <= ptrdiff_t.max, "This library is tuned for thread handles which are a positive integer.");
tid = cast(ptrdiff_t)id;
}
return tid;
}
// TODO: remove when core.atomic.pause is merged
void pause() pure nothrow @nogc @safe
{
asm pure nothrow @nogc @trusted
{
naked;
rep; nop;
ret;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.