Skip to content

Instantly share code, notes, and snippets.

@rjmcguire
Last active January 31, 2017 06:06
Show Gist options
  • Save rjmcguire/d92165d32d51b2eef58d9679c3adbacb to your computer and use it in GitHub Desktop.
Save rjmcguire/d92165d32d51b2eef58d9679c3adbacb to your computer and use it in GitHub Desktop.
little lockfree size_t[size_t] map _experiments_
module lockfree;
import std.stdio;
import per_thread_id;
shared class Map(int SIZE) {
import core.atomic;
// maxContension: the number of times we allow our slot to be stolen from under us
bool set(size_t key, size_t value, ref size_t contended, size_t maxContension=3) {
import std.exception;
enforce(key != 0 && value != 0);
for (int i=0; i<SIZE; i++) {
auto peek = atomicLoad(_data[i].key);
if (peek != key) {
if (peek != 0L) continue;
while (!cas(&_data[i].key, 0L, key)) {
contended++;
if (contended >= maxContension) {
return false;
}
//throw new Exception("contended");
}
}
atomicStore(_data[i].value, value);
//writefln("k:%s, _data[%d] == %s", _data[i].key, i, _data[i]);
return true;
}
throw new Exception("out of space");
}
size_t get(size_t key) {
import std.exception;
enforce(key != 0);
for (int i=0; i<SIZE; i++) {
auto peek = atomicLoad(_data[i].key);
if (peek == 0) continue;
if (peek == key) {
auto value = atomicLoad(_data[i].value);
if (!cas(&_data[i].key, key, 0L)) {
return 0; // failed to take key
} else {
return value;
}
}
}
return 0;
}
invariant {
int[size_t]dups;
for (int i=0; i<SIZE; i++) {
auto v = atomicLoad(_data[i].key);
if (v == 0) break;
assert(v !in dups);
dups[v] = 1;
}
}
private:
Entry[SIZE] _data;
struct Entry {
size_t key,value;
}
}
void main() {
import core.thread;
auto map = new shared Map!100000;
new TestThread!map(1, 5000).start();
//new TestThread!map(5001, 10000).start();
//new TestThread!map(10001, 15000).start();
//new TestThread!map(15001, 20000).start();
//new TestThread!map(20001, 25000).start();
}
import core.thread : Thread;
class TestThread(alias map) : Thread {
size_t min, max;
this(size_t min, size_t max) {
super(&run);
this.min = min;
this.max = max;
}
void run() {
import std.datetime : seconds;
try {
size_t contension;
foreach (i; min .. max) {
contension=0;
while (!map.set(i, i, contension)) {
writeln("contended:", i, " ", i, " contension:", contension);
}
}
Thread.sleep(10.seconds);
foreach (i; min .. max) {
import std.format;
auto v = map.get(i);
assert(v == i, "map[%s] != %s".format(v, i));
}
} catch (Throwable e) {
writeln(e);
}
}
}
module range;
import std.stdio;
import std.format;
import per_thread_id;
import std.exception : enforce;
shared class Map(int SIZE) if (SIZE > 0) {
import core.atomic;
typeof(SIZE) length() {
typeof(SIZE) nonEmpties;
for (int i=0; i<SIZE; i++) {
auto peek = atomicLoad(_data[i].key);
if (peek == 0) {
continue;
}
++nonEmpties;
}
return nonEmpties;
}
// maxContention: the number of times we allow our slot to be stolen from under us
// returns (index of item +1) in the map DO NOT USE frivilously
typeof(SIZE) set(size_t key, size_t value) {
import std.exception;
enforce(key != 0);// && value != 0); have to allow 0 value insertion for when the string calc hasn't been completed but we need the slot
// search for first empty item from end of array
int i = SIZE-1;
for (; i>0; i--) {
auto peek = atomicLoad(_data[i].key);
if (peek != 0) {
i++;
break;
}
}
// advance to end of array as per usual
for (; i<SIZE; i++) {
auto peek = atomicLoad(_data[i].key);
//writefln("in id:%s check setting _data[i:%d].key:%s to %s", id, i, _data[i].key, key);
// TODO: static if (Mode.NoOverwrites) {
assert(peek != key, "setting same key twice!");
// } // end TODO
if (peek != key) {
if (peek != 0L) continue;
if (!cas(&_data[i].key, 0L, key)) {
continue; // position stolen try next
}
}
//writefln("set k:%s to %s", _data[i].key, key);
atomicStore(_data[i].value, value);
//writefln("k:%s, _data[%d] == %s should be %s", _data[i].key, i, _data[i], value);
version (None) writefln("%s acquired %d", id, i);
return i+1;
}
throw noSpaceException;
}
size_t get(size_t key) {
import std.exception;
enforce(key != 0);
for (int i=0; i<SIZE; i++) {
auto peek = atomicLoad(_data[i].key);
if (peek == 0) continue;
if (peek == key) {
auto value = atomicLoad(_data[i].value);
if (!cas(&_data[i].key, key, 0L)) {
return 0; // failed to take key
} else {
return value;
}
}
}
return 0;
}
auto asArray() {
auto map = this;
struct Ret {
int opApply(int delegate(size_t key, size_t value) dg) {
with (map) {
int n;
for (int i=0; i<SIZE; i++) {
auto peek = atomicLoad(_data[i].key);
if (peek == 0) {
break;
}
auto v = atomicLoad(_data[i].value);
if (v == 0) {
break;
}
n = dg(peek, v);
if (n != 0) {
return n;
}
}
return n;
}
}
}
return Ret();
}
//invariant {
// This test is invalid in a case where the keys can change
// int[size_t]dups;
// for (int i=0; i<SIZE; i++) {
// auto v = atomicLoad(_data[i].key);
// if (v == 0) break;
// assert(v !in dups, "somehow we have two keys that are the same!");
// dups[v] = 1;
// }
//}
private:
Entry[SIZE] _data;
struct Entry {
size_t key,value;
}
Exception noSpaceException = new shared Exception("no space");
}
shared class Range(int SIZE) : Map!SIZE if (SIZE > 0) {
import core.atomic;
void put(size_t value) {
import std.datetime;
size_t nospace;
while (true) {
try {
size_t count = this.set(id, value);
atomicStore(this._data[count-1].key, count);
break;
} catch (shared Exception e) {
if (e !is noSpaceException || nospace>=100) {
throw e;
}
Duration wait;
if (nospace >= 25) {
wait = 10.msecs * nospace;
} else if (nospace >= 10) {
wait = 100.msecs;
} else if (nospace >= 5) {
wait = 20.usecs;
} else {
wait = 10.usecs;
}
writeln("no space:", ++nospace, " wait:", wait);
Thread.sleep(wait);
}
}
}
bool _closed;
void close() {
atomicStore(_closed, true);
}
bool empty() {
if (!atomicLoad(_primed)) {
return false;
}
return atomicLoad(_front)==0 && atomicLoad(_closed);
}
private bool _primed;
private size_t _front;
size_t front() {
// prime popFront if needed
if (!_primed) {
popFront;
}
return _front;
}
void popFront() {
import std.datetime;
size_t nospace;
_front = 0;
do {
_popFront();
if (_front != 0) {
return;
}
Duration wait;
if (nospace >= 25) {
wait = 10.msecs * nospace;
} else if (nospace >= 10) {
wait = 100.msecs;
} else if (nospace >= 5) {
wait = 20.usecs;
} else {
wait = 10.usecs;
}
//writeln("no value:", ++nospace, " wait:", wait);
Thread.sleep(wait);
} while (_front==0 && !_closed);
}
void _popFront() {
_front = 0;
for (int i=0; i<SIZE; i++) {
auto peek = atomicLoad(_data[i].key);
if (peek == 0) {
continue;
}
do {
_front = atomicLoad(_data[i].value);
} while (_front == 0);
if (cas(&_data[i].key, peek, id)) {
atomicStore(_data[i].value, 0L);
atomicStore(_data[i].key, 0L);
_primed = true;
return;
}
}
}
}
/*
import core.thread;
void main() {
auto r = new shared Range!10();
//new TestThread!r(10*2).start;
new TestThread!r(10).start;
thread_joinAll();
writefln("processed: len:%d", r.length);
foreach (v; r) {
writefln("popped:%s", v);
}
writefln("done: len:%d", r.length);
}
class TestThread(alias range) : Thread {
int max;
this(int max) {
super(&run);
this.max = max;
}
void run() {
int n = 1;
foreach (i; n .. max+1) {
range.put(i);
}
foreach (k, v; range.asArray) {
version(None)writefln("%d, %d, %d", k, v, n);
assert(k == v && v == n++);
}
}
}
*/
import core.thread;
void main() {
auto r = new shared Range!20();
//new TestThread!r(10*2).start;
Done done;
done++;
new TestThread2!(r, done)(10, 1).start();
done++;
new TestThread2!(r, done)(10, 200).start();
while (!done){}
r.close();
writefln("processed: len:%d", r.length);
new TestThread3!(r)(10).start;
//foreach (v; r) {
// writefln("popped:%s", v);
//}
writefln("mainexit: len:%d", r.length);
thread_joinAll();
}
class TestThread2(alias range, alias done) : Thread {
int max, n;
this(int max, int n) {
super(&run);
this.max = max;
this.n = n;
//writeln("inserting ", max);
}
void run() {
foreach (i; n .. n+max) {
//writeln("put: ", i);
range.put(i);
}
done--;
}
}
class TestThread3(alias range) : Thread {
int max;
this(int max) {
super(&run);
this.max = max;
}
void run() {
try {
writefln("reading len:%d", range.length);
foreach (v; range) {
writefln("%s popped:%s", id, v);
}
writefln("done: len:%d", range.length);
} catch (Exception e) {
writeln(e);
}
}
}
struct Done {
import core.atomic;
private shared size_t n;
auto opUnary(string op)() if (op == "++" || op == "--") {
static if (op=="++") {
atomicOp!"+="(n, 1);
} else static if (op=="--") {
atomicOp!"-="(n, 1);
}
return n;
}
auto opCast(T)() if (is(T==bool)) {
return atomicLoad(n)==0;
}
}
#!/usr/bin/env rdmd
// compile with -O -inline -m64 on i7.2600k
/*
% ./lockfreestringarray.d
st:8us, mt:26808us, st/mt:3012
st:75us, mt:27877us, st/mt:368
st:66us, mt:28094us, st/mt:424
st:20us, mt:29277us, st/mt:1442
% ./lockfreestringarray
st:77us, mt:28260us, st/mt:362
st:63us, mt:28844us, st/mt:457
st:90us, mt:29319us, st/mt:325
st:57us, mt:29640us, st/mt:514
% time ./lockfreestringarray
st:188us, mt:28246us, st/mt:150
st:26us, mt:28601us, st/mt:1079
st:58us, mt:29551us, st/mt:503
st:82us, mt:30232us, st/mt:366
*/
module lockfreestringarray;
import std.stdio;
import std.format;
import per_thread_id;
import std.exception : enforce;
shared class Map(int SIZE) if (SIZE > 0) {
import core.atomic;
typeof(SIZE) length() {
for (int i=0; i<SIZE; i++) {
auto peek = atomicLoad(_data[i].key);
if (peek == 0 && i<=0) {
return i;
}
if (peek == 0 && i>=0) {
return i;
}
}
return SIZE;
}
// maxContention: the number of times we allow our slot to be stolen from under us
// returns (index of item +1) in the map DO NOT USE frivilously
typeof(SIZE) set(size_t key, size_t value) {
import std.exception;
enforce(key != 0);// && value != 0); have to allow 0 value insertion for when the string calc hasn't been completed but we need the slot
for (int i=0; i<SIZE; i++) {
auto peek = atomicLoad(_data[i].key);
// TODO: static if (Mode.NoOverwrites) {
assert(peek != key, "setting same key twice!");
// } // end TODO
//writefln("in id:%s check setting _data[i:%d].key:%s to %s", id, i, _data[i].key, key);
if (peek != key) {
if (peek != 0L) continue;
if (!cas(&_data[i].key, 0L, key)) {
continue; // position stolen try next
}
}
//writefln("set k:%s to %s", _data[i].key, key);
atomicStore(_data[i].value, value);
//writefln("k:%s, _data[%d] == %s should be %s", _data[i].key, i, _data[i], value);
version (None) writefln("%s acquired %d", id, i);
return ++i;
}
throw new Exception("out of space");
}
size_t get(size_t key) {
import std.exception;
enforce(key != 0);
for (int i=0; i<SIZE; i++) {
auto peek = atomicLoad(_data[i].key);
if (peek == 0) continue;
if (peek == key) {
auto value = atomicLoad(_data[i].value);
if (!cas(&_data[i].key, key, 0L)) {
return 0; // failed to take key
} else {
return value;
}
}
}
return 0;
}
int opApply(int delegate(size_t key, size_t value) dg) {
int n;
for (int i=0; i<SIZE; i++) {
auto peek = atomicLoad(_data[i].key);
if (peek == 0) {
break;
}
auto v = atomicLoad(_data[i].value);
if (v == 0) {
break;
}
n = dg(peek, v);
if (n != 0) {
return n;
}
}
return n;
}
invariant {
typeof(SIZE)[size_t]dups;
for (int i=0; i<SIZE; i++) {
auto v = atomicLoad(_data[i].key);
if (v == 0) break;
if (v in dups) {
// check if the key we had previously found is still the same
if (atomicLoad(_data[dups[v]].key) == v) {
assert(false, "somehow we have two keys that are the same!");
}
writeln("key was the same for a moment (means that another thread was busy inserting)");
}
dups[v] = i;
}
}
private:
Entry[SIZE] _data;
struct Entry {
size_t key,value;
}
}
shared class StringArray(int SIZE, size_t MAX_TOTAL_LENGTH) if (SIZE > 0) {
import core.atomic;
private shared Map!SIZE store;
this() {
store = new shared Map!SIZE;
_data.length = MAX_TOTAL_LENGTH;
}
void append(string value, const string file=__FILE__, const size_t line=__LINE__) {
enforce(value.length > 0, "empty strings [%s] not allowed %s:%d".format(value, file, line));
enforce(value.length < MAX_TOTAL_LENGTH, "value too long");
typeof(SIZE) count;
typeof(SIZE) idx = -1;
do {
count = store.set(id, 0);
if (count>0) {
// successfully acquired slot (count-1)
idx = count-1;
break;
}
} while (count<=0);
version (None) writeln("got it: ", idx, " v:", atomicLoad(store._data[idx].value), " count:", count);
size_t start, end = value.length;
if (idx > 0) {
do {
start = atomicLoad(store._data[idx-1].value);
} while (start==0); // need to fetch value until its non zero because previous key might have only just been set
// add previous string's offset
start += atomicLoad(store._data[idx-1].key);
}
++start;
end += start;
version (None) writefln("data[%d..%d]", start, end);
// change the key to the current offset
enforce(start+end < _data.length, "no more space");
atomicStore(store._data[idx].key, start);
atomicStore(store._data[idx].value, value.length);
// memcpy the string into _data
_data[start .. end] = value[];
}
//string get(size_t idx) { // get a string by its index in the array
// import std.conv;
// auto sz = to!string(store.get(id));
// return sz;
//}
int opApply(int delegate(string value) dg) {
import std.exception : assumeUnique;
int n;
foreach (k,v; store) {
//writefln("%s checking kv:%d..%d", id, k, v);
// performing cast here this data is not supposed to be altered
string value = assumeUnique(_data[k..k+v]);
n = dg(value);
if (n != 0) {
return n;
}
}
return n;
}
auto length() {
return store.length;
}
private:
char[] _data;
}
enum TEST_SIZE = 1000;
void main() {
import core.thread;
auto array = new shared StringArray!(TEST_SIZE, 16*1024);
import std.string;
auto words = `Sed ut perspiciatis unde omnis iste natus error sit voluptatem accusantium doloremque laudantium, totam rem aperiam, eaque ipsa quae ab illo inventore veritatis et quasi architecto beatae vitae dicta sunt explicabo. Nemo enim ipsam voluptatem quia voluptas sit aspernatur aut odit aut fugit, sed quia consequuntur magni dolores eos qui ratione voluptatem sequi nesciunt. Neque porro quisquam est, qui dolorem ipsum quia dolor sit amet, consectetur, adipisci velit, sed quia non numquam eius modi tempora incidunt ut labore et dolore magnam aliquam quaerat voluptatem. Ut enim ad minima veniam, quis nostrum exercitationem ullam corporis suscipit laboriosam, nisi ut aliquid ex ea commodi consequatur? Quis autem vel eum iure reprehenderit qui in ea voluptate velit esse quam nihil molestiae consequatur, vel illum qui dolorem eum fugiat quo voluptas nulla pariatur?`.split();
//new TestThread!array(1001, 0, TEST_SIZE/2, words).start();
new TestThread!array(11, 0, TEST_SIZE/4, words).start();
new TestThread!array(112, 0, TEST_SIZE/4, words).start();
new TestThread!array(141, 0, TEST_SIZE/4, words).start();
new TestThread!array(1431, 0, TEST_SIZE/4, words).start();
thread_joinAll();
auto l = array.length;
assert(l == TEST_SIZE, "length test fails:%d".format(l));
import std.range;
import std.algorithm;
size_t i=0;
foreach (v; array) {
scope(exit) ++i;
//writeln(v);
assert(words.canFind(v));
//writeln("assert (v:", v, " == ", testStrings[i], ")");
//assert(v == testStrings[i]);
}
}
import core.thread : Thread;
final class TestThread(alias array) : Thread {
import std.conv;
size_t min, max;
int seed;
string[] words;
this(int seed, size_t min, size_t max, string[] words) {
super(&run);
this.seed = seed;
this.min = min;
this.max = max;
this.words = words;
}
void run() {
import std.format;
import std.datetime : seconds;
import std.random;
Random gen;
gen.seed(seed); // just to make the tests come out the same each time
size_t[] nums;
foreach (i; min .. max) {
nums ~= uniform(0, words.length, gen);
}
import std.datetime;
StopWatch sw; sw.start;
auto n = TickDuration.from!"seconds"(0);
for (int i=0; i<100; i++) {
test1(words, nums);
}
auto singlethreadSpeed = sw.peek - n;
//writeln("test: ", testStrings.length, " st:", singlethreadSpeed.usecs, "us");
try {
sw.reset();
n = TickDuration.from!"seconds"(0);
for (int i=0; i<100; i++) {
test2(words, nums);
}
auto mtSpeed = sw.peek - n;
import std.math : ceil;
writefln("st:%sus,\tmt:%sus,\tst/mt:%s", singlethreadSpeed.usecs/100, mtSpeed.usecs/100, (mtSpeed.hnsecs/100)/(singlethreadSpeed.hnsecs/100));
foreach (i; 0 .. nums.length) {
array.append(words[nums[i]]);
}
} catch (Throwable e) {
writeln(e);
}
}
}
void test1(string[] words, size_t[] nums) {
string[] testStrings;
foreach (i; 0 .. nums.length) {
testStrings ~= words[nums[i]];
}
}
void test2(string[] words, size_t[] nums) {
auto array = new shared StringArray!(TEST_SIZE, 16*1024);
foreach (i; 0 .. nums.length) {
array.append(words[nums[i]]);
}
}
module per_thread_id;
import std.stdio;
struct Id {
import std.uuid;
private UUID _id;
static Id _instance; // TLS static variable
@disable
this();
static auto id() {
import core.atomic;
if (!_instance._id.empty)
return _instance._id.toHash;
_instance = typeof(this).init;
with (_instance) {
_id = randomUUID();
return _id.toHash;
}
}
alias id this;
}
alias Id.id id;
static this() {
id();
}
unittest {
import core.thread;
new Thread({
writeln(Id.id);
}).start();
new Thread({
writeln(Id.id);
}).start();
new Thread({
writeln(Id.id);
}).start();
writeln(Id.id);
}
@rjmcguire
Copy link
Author

These experiments are based on the algorithm discussed at 0.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment