Skip to content

Instantly share code, notes, and snippets.

@Horusiath
Last active November 24, 2020 05:45
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Horusiath/6051e87fb9f3e81dc2dfed93519fb753 to your computer and use it in GitHub Desktop.
Save Horusiath/6051e87fb9f3e81dc2dfed93519fb753 to your computer and use it in GitHub Desktop.
An interfaced generic-aware Akka.NET actor implementation
using System;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Configuration;
namespace AkkaDemos
{
public interface IGreeter
{
Task<string> Greet(IGreeter who);
Task<string> Greetings(IGreeter input);
Task<int> Compare<T>(T a, T b) where T: IComparable<T>;
}
public class Greeter : TypedActor<IGreeter>, IGreeter
{
public async Task<string> Greetings(IGreeter input)
{
//throw new Exception("BOOM!");
return $"Hello, {input}!";
}
public async Task<string> Greet(IGreeter who)
{
var reply = await who.Greetings(Self.Typed<IGreeter>());
return $"{who} replied with: {reply}";
}
public async Task<int> Compare<T>(T a, T b) where T : IComparable<T>
{
return a.CompareTo(b);
}
}
class Program
{
static Config Config(int port) => ConfigurationFactory.ParseString($@"
akka.actor.provider = remote
akka.remote.dot-netty.tcp {{
hostname = 127.0.0.1
port = {port}
}}");
static async Task Main(string[] args)
{
using (var sys1 = ActorSystem.Create("system1", Config(4001)))
using (var sys2 = ActorSystem.Create("system2", Config(4002)))
{
var greeter = sys1.ActorOf(Props.Create<Greeter>(), "greeter").Typed<IGreeter>();
var initiator = sys2.ActorOf(Props.Create<Greeter>(), "initiator").Typed<IGreeter>();
var reply = await initiator.Greet(greeter);
Console.WriteLine(reply);
//Console.WriteLine($"1 compared to 2 => {await greeter.Compare(1, 2)}");
}
}
}
}
using System;
using System.Collections.Concurrent;
using System.Collections.Immutable;
using System.Reflection;
using System.Runtime.ExceptionServices;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Dispatch;
using Akka.Util;
namespace AkkaDemos
{
public abstract class TypedActor<TActor> : ActorBase
{
private static readonly ImmutableDictionary<string, MethodInfo> methods;
private static readonly Type TaskType = typeof(Task);
static TypedActor()
{
var builder = ImmutableDictionary.CreateBuilder<string, MethodInfo>();
foreach (var method in typeof(TActor).GetMethods(BindingFlags.Instance | BindingFlags.Public))
{
builder[method.Name] = method;
}
methods = builder.ToImmutable();
}
protected sealed override bool Receive(object message)
{
if (message is CallMethod call && methods.TryGetValue(call.MethodName, out var method))
{
if (TaskType.IsAssignableFrom(method.ReturnType))
{
ActorTaskScheduler.RunTask(() => CallMethodAsync(method, call.Args, call.TypeArgs, Sender));
}
else
{
method.Invoke(this, call.Args);
}
return true;
}
else return false;
}
private async Task CallMethodAsync(MethodInfo method, object[] args, Type[] typeArgs, IActorRef sender)
{
try
{
if (method.IsGenericMethod)
{
method = method.MakeGenericMethod(typeArgs);
}
var task = method.Invoke(this, args) as Task;
await task;
if (method.ReturnType.IsGenericType)
{
ReturnResultDynamic(task, sender);
}
else
{
sender.Tell(new Status.Success(null));
}
}
catch (Exception e)
{
sender.Tell(new Status.Failure(e));
throw;
}
}
private void ReturnResultDynamic(dynamic task, IActorRef sender)
{
sender.Tell(new Status.Success(task.Result));
}
}
internal sealed class CallMethod
{
public string MethodName { get; }
public object[] Args { get; }
public Type[] TypeArgs { get; }
public CallMethod(string methodName, object[] args, Type[] typeArgs = null)
{
MethodName = methodName;
Args = args;
TypeArgs = typeArgs;
}
}
public class TypedActorProxy : DispatchProxy, ISurrogated
{
public sealed class Surrogate : ISurrogate
{
public IActorRef Self { get; }
public Type ActorType { get; }
public Surrogate(IActorRef self, Type actorType)
{
Self = self;
ActorType = actorType;
}
public ISurrogated FromSurrogate(ActorSystem system)
{
return (ISurrogated)Self.Typed(ActorType);
}
}
private static readonly Type VoidType = typeof(void);
private static readonly Type TaskType = typeof(Task);
private static readonly ConcurrentDictionary<Type, MethodInfo> Mappers = new ConcurrentDictionary<Type, MethodInfo>();
private static readonly MethodInfo GenericMapper;
static TypedActorProxy()
{
GenericMapper = typeof(TypedActorProxy).GetMethod(nameof(MapResult), BindingFlags.Static | BindingFlags.NonPublic);
}
public IActorRef Self { get; private set; }
protected override object Invoke(MethodInfo method, object[] args)
{
if (method.ReturnType == VoidType)
{
Self.Tell(new CallMethod(method.Name, args, method.IsGenericMethod ? method.GetGenericArguments() : null));
return null;
}
else if (TaskType.IsAssignableFrom(method.ReturnType))
{
var cancellationToken = default(CancellationToken);
if (!(args is null))
{
for (int i = args.Length - 1; i >= 0; i--)
{
if (args[i] is CancellationToken t)
{
cancellationToken = t;
//TODO: replace CancellationToken with a surrogate version, that can be serialized
break;
}
}
}
var task = Self.Ask<Status>(new CallMethod(method.Name, args, method.IsGenericMethod ? method.GetGenericArguments() : null), cancellationToken);
if (method.ReturnType == TaskType)
{
return HandleResult(task);
}
else
{
var returnType = method.ReturnType.GetGenericArguments()[0];
var mapper = Mappers.GetOrAdd(returnType, (retType) => GenericMapper.MakeGenericMethod(retType));
return mapper.Invoke(null, new object[]{ task });
}
}
else
{
throw new InvalidOperationException(
$"TypedActor proxy can be used only for methods, which return either void or Task.");
}
}
public override string ToString() => Self.ToString();
public ISurrogate ToSurrogate(ActorSystem system)
{
var actorType = this.GetType().GetInterfaces()[0];
return new Surrogate(Self, actorType);
}
private static Task HandleResult(Task<Status> task)
{
return task.ContinueWith(t =>
{
if (t.IsCompletedSuccessfully)
{
switch (t.Result)
{
case Status.Success _: break;
case Status.Failure f:
ExceptionDispatchInfo.Capture(f.Cause).Throw();
break;
}
}
else
{
ExceptionDispatchInfo.Capture(task.Exception).Throw();
}
});
}
private static Task<T> MapResult<T>(Task<Status> task)
{
return task.ContinueWith(t =>
{
if (t.IsCompletedSuccessfully)
{
switch (t.Result)
{
case Status.Success s: return (T)s.Status;
case Status.Failure f:
ExceptionDispatchInfo.Capture(f.Cause).Throw();
return default;
default:
throw new NotSupportedException($"Return value '{t.Result}' is not supported.");
}
}
else
{
ExceptionDispatchInfo.Capture(task.Exception).Throw();
return default;
}
});
}
}
public static class TypedActors
{
private static readonly PropertyInfo SelfProperty;
private static readonly ConcurrentDictionary<Type, Func<object>> DispatchProxies = new ConcurrentDictionary<Type, Func<object>>();
static TypedActors()
{
SelfProperty = typeof(TypedActorProxy).GetProperty("Self");
}
public static TActor TypedActorOf<TActor>(this IActorRefFactory factory, Props props, string name)
{
var actorRef = factory.ActorOf(props, name);
return actorRef.Typed<TActor>();
}
public static TActor TypedActorOf<TActor>(this IActorRefFactory factory, Props props)
{
var actorRef = factory.ActorOf(props);
return actorRef.Typed<TActor>();
}
public static TActor Typed<TActor>(this IActorRef actorRef)
{
var proxy = DispatchProxy.Create<TActor, TypedActorProxy>();
SelfProperty.SetValue(proxy, actorRef);
return proxy;
}
private static readonly MethodInfo ProxyFactory =
typeof(DispatchProxy).GetMethod("Create", BindingFlags.Public | BindingFlags.Static);
private static Func<Type, Func<object>> DispatchProxyFactory = type =>
{
var factory = ProxyFactory.MakeGenericMethod(type, typeof(TypedActorProxy));
return () => factory.Invoke(null, null);
};
internal static object Typed(this IActorRef actorRef, Type actorType)
{
var factory = DispatchProxies.GetOrAdd(actorType, DispatchProxyFactory);
var proxy = factory();
SelfProperty.SetValue(proxy, actorRef);
return proxy;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment