Skip to content

Instantly share code, notes, and snippets.

@Horusiath
Last active December 15, 2015 16:51
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
Star You must be signed in to star a gist
Save Horusiath/e0c0bfc9eabbe096cb21 to your computer and use it in GitHub Desktop.
This is example of returning responses from actor refs in form of the enumerable.
public static class EnumerablePatterns
{
public static IEnumerable<T> Query<T>(this ICanTell self, object message = null)
{
var provider = ResolveProvider(self);
if (provider == null)
throw new NotSupportedException("Unable to resolve the target Provider");
var enumerable = new BlockingCollection<T>();
var path = provider.TempPath();
Action unregister = () => provider.UnregisterTempActor(path);
var queryable = new EnumerableActorRef<T>(enumerable, unregister, path);
provider.RegisterTempActor(queryable, path);
self.Tell(message ?? Start.Instance, queryable);
return enumerable.GetConsumingEnumerable();
}
private static IActorRefProvider ResolveProvider(ICanTell self)
{
IInternalActorRef internalRef;
if ((internalRef = self as IInternalActorRef) != null)
return internalRef.Provider;
ActorSelection selection;
if ((selection = self as ActorSelection) != null)
return ResolveProvider(selection.Anchor);
return null;
}
}
internal class EnumerableActorRef<T> : MinimalActorRef
{
private readonly BlockingCollection<T> _enumerable;
private readonly Action _unregister;
private readonly ActorPath _path;
internal EnumerableActorRef(BlockingCollection<T> enumerable, Action unregister, ActorPath path)
{
_enumerable = enumerable;
_unregister = unregister;
_path = path;
}
public override ActorPath Path { get { return _path; } }
public override IActorRefProvider Provider { get { throw new NotSupportedException(); } }
protected override void TellInternal(object message, IActorRef sender)
{
if (message is ISystemMessage)
SendSystemMessage(message.AsInstanceOf<ISystemMessage>(), sender);
else if (message is Completed)
{
_unregister();
_enumerable.CompleteAdding();
_enumerable.Dispose();
}
else if (message is T)
_enumerable.Add((T)message);
}
}
public struct Completed
{
public static readonly Completed Instance = new Completed();
}
public struct Start
{
public static readonly Start Instance = new Start();
}
public class MyActor : ReceiveActor
{
public MyActor()
{
ReceiveAny(_ =>
{
StartProducing(Sender);
});
// in order to close enumerable, this actor should send Complete.Instance to the same sender, which received initial response
}
private void StartProducing(IActorRef sender)
{
var sec = TimeSpan.FromSeconds(1);
var i = 1;
Context.System.Scheduler.Advanced.ScheduleRepeatedly(sec, sec, () => sender.Tell(i++));
}
}
class Program
{
static void Main(string[] args)
{
using (var system = ActorSystem.Create("system"))
{
var aref = system.ActorOf(Props.Create<MyActor>(), "child");
var q = (from i in aref.Query<int>()
where i > 3 && i < 100
select i + ", ");
// In the example, each iteration in foreach will run, when corresponding value will be send by an actor -
// in this case first time after few seconds (because we're filtering on 3 as lowerbound), and then after
// every 1 sec. But it will never end, as we don't tell enumerable to be closed at any time
foreach (var i in q)
Console.Write(i);
Console.ReadLine();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment