Skip to content

Instantly share code, notes, and snippets.

@pshrosbree
Last active September 20, 2018 16:57
Show Gist options
  • Save pshrosbree/6344173ed4f073158527a1ea7c4eb70f to your computer and use it in GitHub Desktop.
Save pshrosbree/6344173ed4f073158527a1ea7c4eb70f to your computer and use it in GitHub Desktop.
Fair request queueing with Akka.NET
namespace FairQueueing
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Routing;
public class ApplicationActor : ReceiveActor, IWithUnboundedStash
{
private const int MaxPending = 20;
public ApplicationActor()
{
Output = ActorRefs.Nobody;
Become(Sending);
}
protected override void PreStart()
{
Output = Context.ActorOf(Props.Create(() => new OutputActor()), "output");
}
private void Sending()
{
Receive<Message>(message =>
{
Output.Tell(message);
PendingMessages.Add(message);
if (PendingMessages.Count >= MaxPending)
Become(Waiting);
});
Receive<OutputResponse>(response => PendingMessages.Remove(response.Message), response => response.Sent);
Receive<OutputResponse>(response => Output.Tell(response.Message), response => !response.Sent);
}
private void Waiting()
{
Receive<Message>(message => Stash.Stash());
Receive<OutputResponse>(response =>
{
PendingMessages.Remove(response.Message);
if (PendingMessages.Count < MaxPending)
{
Stash.UnstashAll();
Become(Sending);
}
}, response => response.Sent);
Receive<OutputResponse>(response => Output.Tell(response.Message), response => !response.Sent);
}
private IList<Message> PendingMessages { get; } = new List<Message>();
private IActorRef Output { get; set; }
public IStash Stash { get; set; }
}
class Program
{
static void Main(string[] args)
{
// make an actor system
var actorSystem = ActorSystem.Create("MyActorSystem");
// make our first actors!
var displayActor = actorSystem.ActorOf(Props.Create(() => new DisplayActor()), "display");
Console.WriteLine($"Display actor: {displayActor.Path}");
var applicationActor = actorSystem.ActorOf(Props.Create(() => new ApplicationActor()).WithRouter(new ConsistentHashingPool(10)), "application");
var endpointActor = actorSystem.ActorOf(Props.Create(() => new EndpointActor(applicationActor)), "endpoint");
endpointActor.Tell(Command.Start);
actorSystem.WhenTerminated.Wait();
}
}
public class OutputActor : ReceiveActor
{
private Random Random { get; } = new Random((int)(DateTime.Now.Ticks % int.MaxValue));
private ActorSelection DisplayActor { get; }
public OutputActor()
{
DisplayActor = Context.ActorSelection("/user/display");
Receive<OutputResponse>(response =>
{
DisplayActor.Tell(response.Message);
Sender.Tell(response);
});
Receive<Message>(message => SendAsync(message).PipeTo(Self, Sender), _ => !_.Terminate);
Receive<Message>(
message => Context.System.Scheduler.ScheduleTellOnce(TimeSpan.FromMilliseconds(100), Self, PoisonPill.Instance,
Sender), _ => _.Terminate);
}
/// <summary>
/// Simulation of an asynchronous send
/// </summary>
private async Task<OutputResponse> SendAsync(Message message)
{
var delay = TimeSpan.FromMilliseconds(Random.Next(0, 5));
await Task.Delay(delay).ConfigureAwait(false);
return new OutputResponse(message, true);
}
}
public enum Command
{
Start,
Stop,
Terminate
}
public readonly struct OutputResponse
{
public OutputResponse(Message message, bool sent)
{
Message = message;
Sent = sent;
}
public Message Message { get; }
public bool Sent { get; }
}
public readonly struct Message : IConsistentHashable, IEquatable<Message>, IEqualityComparer<Message>
{
public Message(int appId, bool terminate)
{
AppId = appId;
Terminate = terminate;
}
public int AppId { get; }
public bool Terminate { get; }
public override bool Equals(object obj) => !ReferenceEquals(null, obj) && obj is Message other && Equals(other);
public override int GetHashCode() => AppId.GetHashCode();
public object ConsistentHashKey => AppId;
public bool Equals(Message other) => AppId.Equals(other.AppId);
public bool Equals(Message x, Message y) => x.AppId == y.AppId;
public int GetHashCode(Message obj) => AppId.GetHashCode();
}
/// <summary>
/// This stub generates a nonuniform and unbalanced message load for applications
/// </summary>
public class EndpointActor : ReceiveActor
{
private Random Random { get; } = new Random((int)(DateTime.Now.Ticks % int.MaxValue));
private int[] Counts { get; } = { 10, 10, 100 };
public EndpointActor(IActorRef applicationRouter)
{
ApplicationRouter = applicationRouter;
Receive<Command>(_ =>
{
for (var i = 0; i < 5; i++)
{
var messages = new List<Message>();
for (var j = 0; j < Counts.Length; j++)
for (var k = 0; k < Counts[j]; k++)
messages.Add(new Message(j + 1, false));
foreach (var message in messages.OrderBy(__ => Random.NextDouble()))
ApplicationRouter.Tell(message);
}
Console.WriteLine("Scheduling stop");
Context.System.Scheduler.ScheduleTellOnce(TimeSpan.FromSeconds(2), Self, Command.Stop, Self);
},
_ => _ == Command.Start);
Receive<Command>(_ =>
{
for (var i = 0; i < Counts.Length; i++)
ApplicationRouter.Tell(new Message(i + 1, true));
Console.WriteLine("Scheduling termination");
Context.System.Scheduler.ScheduleTellOnce(TimeSpan.FromSeconds(3), Self, Command.Terminate, Self);
},
_ => _ == Command.Stop);
Receive<Command>(_ =>
{
Console.WriteLine("Terminating");
Context.System.Terminate();
},
_ => _ == Command.Terminate);
}
private IActorRef ApplicationRouter { get; }
}
/// <summary>
/// Displays the messages but is not involved in the algorithm
/// </summary>
public class DisplayActor : ReceiveActor
{
private int AppId { get; set; }
private int AppCount { get; set; }
private int Total { get; set; }
public DisplayActor()
{
// Same appId case
Receive<Message>(message =>
{
AppId = message.AppId;
AppCount = AppCount + 1;
Total = Total + 1;
},
_ => AppId == 0 || _.AppId == AppId);
// New appId case
Receive<Message>(message =>
{
Console.WriteLine($"[{Total:D4}]: AppId {AppId} = {AppCount}");
AppId = message.AppId;
AppCount = 1;
Total = Total + 1;
});
// If no messages for a while, emit the current count and reset app id and count
Receive<ReceiveTimeout>(_ =>
{
Console.WriteLine($"[{Total:D4}]: AppId {AppId} = {AppCount}");
AppId = 0;
AppCount = 0;
}, _ => AppId != 0);
}
protected override void PreStart() => Context.SetReceiveTimeout(TimeSpan.FromMilliseconds(75));
}
}
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp2.1</TargetFramework>
<LangVersion>latest</LangVersion>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Akka" Version="1.3.9" />
</ItemGroup>
</Project>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment