Skip to content

Instantly share code, notes, and snippets.

@run-dlang
Created April 5, 2018 13:38
Show Gist options
  • Save run-dlang/674169ca4d0c74f81bd301d0a49ac874 to your computer and use it in GitHub Desktop.
Save run-dlang/674169ca4d0c74f81bd301d0a49ac874 to your computer and use it in GitHub Desktop.
Code shared from run.dlang.io.
import std.concurrency : receiveOnly, send,
spawn, Tid, thisTid;
import core.atomic : atomicOp, atomicLoad;
import core.sync.mutex;
shared Mutex mtx;
/*
Queue that can be used safely among
different threads. All access to an
instance is automatically locked thanks to
synchronized keyword.
*/
synchronized class SafeQueue(T)
{
// Note: must be private in synchronized
// classes otherwise D complains.
private T[] elements;
void push(T value) {
elements ~= value;
}
/// Return T.init if queue empty
T pop() {
import std.array : empty;
T value;
if (elements.empty)
return value;
value = elements[0];
elements = elements[1 .. $];
return value;
}
auto length() { return elements.length; }
}
/*
Safely print messages independent of
number of concurrent threads.
Note that variadic parameters are used
for args! That is args might be 0 .. N
parameters.
*/
void safePrint(T...)(T args)
{
// Just executed by one concurrently
synchronized {
import std.stdio : writeln;
writeln(args);
}
}
void threadProducer(shared(SafeQueue!int) queue,
shared(int)* queueCounter)
{
import std.range : iota;
// Push values 1 to 11
foreach (i; iota(1,11)) {
mtx.lock();
atomicOp!"+="(*queueCounter, 1);
queue.push(i);
safePrint("Pushed ", i);
mtx.unlock;
}
}
void threadConsumer(Tid owner,
shared(SafeQueue!int) queue,
shared(int)* queueCounter)
{
int popped = 0;
while (popped != 10) {
if( queue.length == 0 )
continue;
mtx.lock;
auto i = queue.pop();
safePrint("Popped ", i,
" (Producer pushed ",
// safely fetch current value of
// queueCounter using atomicLoad
atomicLoad(*queueCounter), ")");
mtx.unlock;
++popped;
}
// I'm done!
owner.send(true);
}
void main()
{
auto queue = new shared(SafeQueue!int);
shared int counter = 0;
mtx = new shared(Mutex)();
spawn(&threadProducer, queue, &counter);
spawn(&threadConsumer, thisTid, queue,
&counter);
auto stopped = receiveOnly!bool;
assert(stopped);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment