Last active November 24, 2020 05:45
An interfaced generic-aware Akka.NET actor implementation
using System;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Configuration;
namespace AkkaDemos
public interface IGreeter
Task<string> Greet(IGreeter who);
Task<string> Greetings(IGreeter input);
Task<int> Compare<T>(T a, T b) where T: IComparable<T>;
public class Greeter : TypedActor<IGreeter>, IGreeter
public async Task<string> Greetings(IGreeter input)
//throw new Exception("BOOM!");
return $"Hello, {input}!";
public async Task<string> Greet(IGreeter who)
var reply = await who.Greetings(Self.Typed<IGreeter>());
return $"{who} replied with: {reply}";
public async Task<int> Compare<T>(T a, T b) where T : IComparable<T>
return a.CompareTo(b);
class Program
static Config Config(int port) => ConfigurationFactory.ParseString($@" = remote {{
hostname =
port = {port}
static async Task Main(string[] args)
using (var sys1 = ActorSystem.Create("system1", Config(4001)))
using (var sys2 = ActorSystem.Create("system2", Config(4002)))
var greeter = sys1.ActorOf(Props.Create<Greeter>(), "greeter").Typed<IGreeter>();
var initiator = sys2.ActorOf(Props.Create<Greeter>(), "initiator").Typed<IGreeter>();
var reply = await initiator.Greet(greeter);
//Console.WriteLine($"1 compared to 2 => {await greeter.Compare(1, 2)}");
using System;
using System.Collections.Concurrent;
using System.Collections.Immutable;
using System.Reflection;
using System.Runtime.ExceptionServices;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Dispatch;
using Akka.Util;
namespace AkkaDemos
public abstract class TypedActor<TActor> : ActorBase
private static readonly ImmutableDictionary<string, MethodInfo> methods;
private static readonly Type TaskType = typeof(Task);
static TypedActor()
var builder = ImmutableDictionary.CreateBuilder<string, MethodInfo>();
foreach (var method in typeof(TActor).GetMethods(BindingFlags.Instance | BindingFlags.Public))
builder[method.Name] = method;
methods = builder.ToImmutable();
protected sealed override bool Receive(object message)
if (message is CallMethod call && methods.TryGetValue(call.MethodName, out var method))
if (TaskType.IsAssignableFrom(method.ReturnType))
ActorTaskScheduler.RunTask(() => CallMethodAsync(method, call.Args, call.TypeArgs, Sender));
method.Invoke(this, call.Args);
return true;
else return false;
private async Task CallMethodAsync(MethodInfo method, object[] args, Type[] typeArgs, IActorRef sender)
if (method.IsGenericMethod)
method = method.MakeGenericMethod(typeArgs);
var task = method.Invoke(this, args) as Task;
await task;
if (method.ReturnType.IsGenericType)
ReturnResultDynamic(task, sender);
sender.Tell(new Status.Success(null));
catch (Exception e)
sender.Tell(new Status.Failure(e));
private void ReturnResultDynamic(dynamic task, IActorRef sender)
sender.Tell(new Status.Success(task.Result));
internal sealed class CallMethod
public string MethodName { get; }
public object[] Args { get; }
public Type[] TypeArgs { get; }
public CallMethod(string methodName, object[] args, Type[] typeArgs = null)
MethodName = methodName;
Args = args;
TypeArgs = typeArgs;
public class TypedActorProxy : DispatchProxy, ISurrogated
public sealed class Surrogate : ISurrogate
public IActorRef Self { get; }
public Type ActorType { get; }
public Surrogate(IActorRef self, Type actorType)
Self = self;
ActorType = actorType;
public ISurrogated FromSurrogate(ActorSystem system)
return (ISurrogated)Self.Typed(ActorType);
private static readonly Type VoidType = typeof(void);
private static readonly Type TaskType = typeof(Task);
private static readonly ConcurrentDictionary<Type, MethodInfo> Mappers = new ConcurrentDictionary<Type, MethodInfo>();
private static readonly MethodInfo GenericMapper;
static TypedActorProxy()
GenericMapper = typeof(TypedActorProxy).GetMethod(nameof(MapResult), BindingFlags.Static | BindingFlags.NonPublic);
public IActorRef Self { get; private set; }
protected override object Invoke(MethodInfo method, object[] args)
if (method.ReturnType == VoidType)
Self.Tell(new CallMethod(method.Name, args, method.IsGenericMethod ? method.GetGenericArguments() : null));
return null;
else if (TaskType.IsAssignableFrom(method.ReturnType))
var cancellationToken = default(CancellationToken);
if (!(args is null))
for (int i = args.Length - 1; i >= 0; i--)
if (args[i] is CancellationToken t)
cancellationToken = t;
//TODO: replace CancellationToken with a surrogate version, that can be serialized
var task = Self.Ask<Status>(new CallMethod(method.Name, args, method.IsGenericMethod ? method.GetGenericArguments() : null), cancellationToken);
if (method.ReturnType == TaskType)
return HandleResult(task);
var returnType = method.ReturnType.GetGenericArguments()[0];
var mapper = Mappers.GetOrAdd(returnType, (retType) => GenericMapper.MakeGenericMethod(retType));
return mapper.Invoke(null, new object[]{ task });
throw new InvalidOperationException(
$"TypedActor proxy can be used only for methods, which return either void or Task.");
public override string ToString() => Self.ToString();
public ISurrogate ToSurrogate(ActorSystem system)
var actorType = this.GetType().GetInterfaces()[0];
return new Surrogate(Self, actorType);
private static Task HandleResult(Task<Status> task)
return task.ContinueWith(t =>
if (t.IsCompletedSuccessfully)
switch (t.Result)
case Status.Success _: break;
case Status.Failure f:
private static Task<T> MapResult<T>(Task<Status> task)
return task.ContinueWith(t =>
if (t.IsCompletedSuccessfully)
switch (t.Result)
case Status.Success s: return (T)s.Status;
case Status.Failure f:
return default;
throw new NotSupportedException($"Return value '{t.Result}' is not supported.");
return default;
public static class TypedActors
private static readonly PropertyInfo SelfProperty;
private static readonly ConcurrentDictionary<Type, Func<object>> DispatchProxies = new ConcurrentDictionary<Type, Func<object>>();
static TypedActors()
SelfProperty = typeof(TypedActorProxy).GetProperty("Self");
public static TActor TypedActorOf<TActor>(this IActorRefFactory factory, Props props, string name)
var actorRef = factory.ActorOf(props, name);
return actorRef.Typed<TActor>();
public static TActor TypedActorOf<TActor>(this IActorRefFactory factory, Props props)
var actorRef = factory.ActorOf(props);
return actorRef.Typed<TActor>();
public static TActor Typed<TActor>(this IActorRef actorRef)
var proxy = DispatchProxy.Create<TActor, TypedActorProxy>();
SelfProperty.SetValue(proxy, actorRef);
return proxy;
private static readonly MethodInfo ProxyFactory =
typeof(DispatchProxy).GetMethod("Create", BindingFlags.Public | BindingFlags.Static);
private static Func<Type, Func<object>> DispatchProxyFactory = type =>
var factory = ProxyFactory.MakeGenericMethod(type, typeof(TypedActorProxy));
return () => factory.Invoke(null, null);
internal static object Typed(this IActorRef actorRef, Type actorType)
var factory = DispatchProxies.GetOrAdd(actorType, DispatchProxyFactory);
var proxy = factory();
SelfProperty.SetValue(proxy, actorRef);
return proxy;
