Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Dlang non blocking messaging
import std.stdio;
import std.concurrency;
import std.conv;
import core.thread;
shared class Msg
{
string value;
Msg next = null;
Tid from;
this( string value = "" )
{
this.value = value;
this.from = cast(shared(Tid)) thisTid;
}
}
alias shared( Msg[ Tid ] ) Queue;
synchronized shared Queue[ Tid ] bus;
void main()
{
auto cluster = new Cluster;
cluster.run();
}
class Actor
{
void run()
{
shared Msg[ Tid ] msgs;
bus[ thisTid ] = msgs;
this.main();
}
abstract void main();
Tid start( A )()
{
return spawn( function(){
auto actor = new A;
actor.run();
});
}
void eatInfinite()
{
while( true ) {
this.eatAll();
Thread.sleep( dur!("msecs")( 1 ) );
}
}
bool eatAll()
{
bool someEated;
do {
someEated = false;
foreach( Tid tid , ref shared Msg msg ; bus[ thisTid ] ) {
if( !msg ) continue;
if( !msg.next ) continue;
msg = msg.next ;
this.eat( cast(Tid) msg.from , msg.value );
someEated = true;
}
} while( someEated );
return someEated;
}
void eat( Tid from , string value )
{
writeln( "Received: " , from.to!string , value );
}
void feed( Tid to , string value )
{
shared Msg last;
if( thisTid !in bus[ to ] ) {
last = bus[ to ][ thisTid ] = new shared Msg;
} else {
last = bus[ to ][ thisTid ];
}
while( last.next ) last = last.next;
last.next = new shared Msg( value );
}
}
class Cluster : Actor
{
override void main()
{
foreach( uint id ; 0 .. 4 ) {
auto childTid = this.start!Worker;
}
this.eatInfinite();
}
}
class Worker : Actor
{
override void main()
{
foreach( int i ; 1 .. 5 ){
Thread.sleep( dur!("msecs")( 0 ) );
this.feed( ownerTid , "Hey, " ~ i.to!string );
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.