Created
April 5, 2018 13:38
-
-
Save run-dlang/674169ca4d0c74f81bd301d0a49ac874 to your computer and use it in GitHub Desktop.
Code shared from run.dlang.io.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import std.concurrency : receiveOnly, send, | |
spawn, Tid, thisTid; | |
import core.atomic : atomicOp, atomicLoad; | |
import core.sync.mutex; | |
shared Mutex mtx; | |
/* | |
Queue that can be used safely among | |
different threads. All access to an | |
instance is automatically locked thanks to | |
synchronized keyword. | |
*/ | |
synchronized class SafeQueue(T) | |
{ | |
// Note: must be private in synchronized | |
// classes otherwise D complains. | |
private T[] elements; | |
void push(T value) { | |
elements ~= value; | |
} | |
/// Return T.init if queue empty | |
T pop() { | |
import std.array : empty; | |
T value; | |
if (elements.empty) | |
return value; | |
value = elements[0]; | |
elements = elements[1 .. $]; | |
return value; | |
} | |
auto length() { return elements.length; } | |
} | |
/* | |
Safely print messages independent of | |
number of concurrent threads. | |
Note that variadic parameters are used | |
for args! That is args might be 0 .. N | |
parameters. | |
*/ | |
void safePrint(T...)(T args) | |
{ | |
// Just executed by one concurrently | |
synchronized { | |
import std.stdio : writeln; | |
writeln(args); | |
} | |
} | |
void threadProducer(shared(SafeQueue!int) queue, | |
shared(int)* queueCounter) | |
{ | |
import std.range : iota; | |
// Push values 1 to 11 | |
foreach (i; iota(1,11)) { | |
mtx.lock(); | |
atomicOp!"+="(*queueCounter, 1); | |
queue.push(i); | |
safePrint("Pushed ", i); | |
mtx.unlock; | |
} | |
} | |
void threadConsumer(Tid owner, | |
shared(SafeQueue!int) queue, | |
shared(int)* queueCounter) | |
{ | |
int popped = 0; | |
while (popped != 10) { | |
if( queue.length == 0 ) | |
continue; | |
mtx.lock; | |
auto i = queue.pop(); | |
safePrint("Popped ", i, | |
" (Producer pushed ", | |
// safely fetch current value of | |
// queueCounter using atomicLoad | |
atomicLoad(*queueCounter), ")"); | |
mtx.unlock; | |
++popped; | |
} | |
// I'm done! | |
owner.send(true); | |
} | |
void main() | |
{ | |
auto queue = new shared(SafeQueue!int); | |
shared int counter = 0; | |
mtx = new shared(Mutex)(); | |
spawn(&threadProducer, queue, &counter); | |
spawn(&threadConsumer, thisTid, queue, | |
&counter); | |
auto stopped = receiveOnly!bool; | |
assert(stopped); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment