Skip to content

Instantly share code, notes, and snippets.

@kekekeks
Last active August 29, 2015 14:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kekekeks/03746fda297e6793bc5e to your computer and use it in GitHub Desktop.
Save kekekeks/03746fda297e6793bc5e to your computer and use it in GitHub Desktop.
ActorAwait reentrancy
using System;
using System.Runtime.CompilerServices;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Actor.Internal;
using Akka.Dispatch;
using Akka.Dispatch.SysMsg;
namespace Sandbox
{
public static class Program
{
class MyActor : UntypedActor
{
protected override async void OnReceive(object message)
{
if (message is string)
{
Console.WriteLine("Before first delay, message: " + message);
await Task.Delay(1000).ActorAwait(AsyncBehavior.Suspend);
Console.WriteLine("After suspended delay, message: " + message);
await Task.Delay(1000).ActorAwait(AsyncBehavior.Reentrant);
Console.WriteLine("After reentrant delay, message: " + message);
}
}
}
public static void Main()
{
var system = ActorSystem.Create("test");
var actor = system.ActorOf(Props.Create<MyActor>());
actor.Tell("1");
actor.Tell("2");
system.AwaitTermination();
}
//↓↓↓ MAGIC ↓↓↓
public static ActorAwaiter ActorAwait(this Task task, AsyncBehavior behavior = AsyncBehavior.Suspend)
{
return new ActorAwaiter(task, behavior);
}
public class ActorAwaiter : ICriticalNotifyCompletion, INotifyCompletion
{
private readonly Task _task;
private readonly AsyncBehavior _behavior;
private readonly AmbientState _state;
private readonly ActorCell _context;
public ActorAwaiter(Task task, AsyncBehavior behavior)
{
_task = task;
_behavior = behavior;
_context = InternalCurrentActorCellKeeper.Current;
_state = new AmbientState
{
Sender = _context.Sender,
Self = _context.Self
};
}
public void GetResult()
{
_task.GetAwaiter().GetResult();
}
public ActorAwaiter GetAwaiter()
{
return this;
}
public bool IsCompleted{get { return _task.IsCompleted; }}
public void OnCompleted(Action continuation)
{
if(_behavior == AsyncBehavior.Suspend)
_context.SuspendReentrancy();
_task.ContinueWith(t =>
{
_state.Self.Tell(new CompleteTask(_state, () =>
{
if (_behavior == AsyncBehavior.Suspend)
_context.ResumeReentrancy();
continuation();
}), _state.Sender);
}, TaskContinuationOptions.ExecuteSynchronously);
}
public void UnsafeOnCompleted(Action continuation)
{
OnCompleted(continuation);
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment