Skip to content

Instantly share code, notes, and snippets.

@ncthbrt
Last active November 16, 2016 14:30
Show Gist options
  • Save ncthbrt/b9493da4e977bee8024634a973bbecf3 to your computer and use it in GitHub Desktop.
Save ncthbrt/b9493da4e977bee8024634a973bbecf3 to your computer and use it in GitHub Desktop.
Factorial Toy
using System;
using System.Numerics;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;
using Microsoft.AspNetCore.Mvc;
namespace ClassicTestAkkaApp
{
public class NumberRequest
{
public BigInteger Value { get; set; }
}
[Route("/api/factorial/")]
public class HomeModule : Controller
{
public TimeSpan MaxCpuTime = TimeSpan.FromMinutes(1);
public HomeModule()
{
}
[HttpGet("{number}")]
public async Task<IActionResult> Get([FromRoute] string number, CancellationToken token)
{
var handle = Guid.NewGuid();
try
{
var result = await SystemActors.TestFarmerActor.Ask<BigInteger>(new TestFarmerActorFactorialCommand(BigInteger.Parse(number), handle), MaxCpuTime);
return Ok(result.ToString());
}
catch (TaskCanceledException)
{
SystemActors.TestFarmerActor.Tell(new TestFarmerActorFactorialCancelCommand(handle));
return BadRequest($"{number} factorial could not be computed within {MaxCpuTime.TotalSeconds} seconds");
}
}
}
}
using System;
using System.Collections.Generic;
using static LanguageExt.Prelude;
using System.Numerics;
using System.Threading;
using System.Threading.Tasks;
using Akka.Actor;
using Akka.Routing;
using WhereIsMyTransport.Functional;
using static WhereIsMyTransport.Functional.WPrelude;
namespace ClassicTestAkkaApp
{
public class ProductActorResponse
{
public ProductActorResponse()
{
}
public ProductActorResponse(BigInteger result, IActorRef sender, Guid partitionRef)
{
Result = result;
PartitionRef = partitionRef;
Sender = sender;
}
public BigInteger Result { get; private set; }
public IActorRef Sender { get; private set; }
public Guid PartitionRef { get; private set; }
}
public class CancelProductActorRequest
{
public CancelProductActorRequest()
{
}
public CancelProductActorRequest(IActorRef sender)
{
Sender = sender;
}
public IActorRef Sender { get; private set; }
}
public class ProductActorRequest
{
public ProductActorRequest()
{
}
public ProductActorRequest(BigInteger lower, BigInteger upper, IActorRef sender, Guid partitionRef)
{
Lower = lower;
Upper = upper;
Sender = sender;
PartitionRef = partitionRef;
}
public BigInteger Lower { get; private set; }
public BigInteger Upper { get; private set; }
public IActorRef Sender { get; private set; }
public Guid PartitionRef { get; private set; }
}
public class ProductActor : ReceiveActor
{
private BigInteger Calculate(BigInteger upper, BigInteger lower)
{
BigInteger product = 1;
for (var i = lower; i <= upper; ++i)
{
product *= i;
}
return product;
}
private readonly IDictionary<IActorRef, CancellationTokenSource> _cancellationTokens=new Dictionary<IActorRef, CancellationTokenSource>();
private readonly IDictionary<IActorRef, Task> _runningTasks=new Dictionary<IActorRef, Task>();
public ProductActor()
{
Receive<Tuple<IActorRef, ProductActorResponse>>(response =>
{
_runningTasks.Remove(response.Item1);
_cancellationTokens.Remove(response.Item1);
response.Item1.Tell(response.Item2);
});
Receive<Broadcast>(broadcast => broadcast.Message is CancelProductActorRequest,cancelBroadCast=>
{
Console.WriteLine("Oh noes! I am being cancelled");
var cancel = (CancelProductActorRequest) cancelBroadCast.Message;
_cancellationTokens[cancel.Sender].Cancel();
_cancellationTokens.Remove(cancel.Sender);
_runningTasks.Remove(cancel.Sender);
});
Receive<ProductActorRequest>(request =>
{
var tokenSource = new CancellationTokenSource();
_cancellationTokens[request.Sender] = tokenSource;
var sender = Sender;
var task=Task.Run(() => Task.FromResult(Tuple(sender, new ProductActorResponse(Calculate(request.Upper, request.Lower), request.Sender, request.PartitionRef))), tokenSource.Token).PipeTo(Self);
_runningTasks[request.Sender] = task;
});
}
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Topshelf;
namespace TestAkkaApp
{
public class Program
{
static void Main(string[] args)
{
HostFactory.Run(x =>
{
x.Service<TestActorService>(s =>
{
s.ConstructUsing(n => new TestActorService());
s.WhenStarted(service => service.Start());
s.WhenStopped(service => service.Stop());
});
x.RunAsLocalService();
x.UseAssemblyInfoForServiceInfo();
});
}
}
}
using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;
namespace TestAkkaApp
{
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services.AddMvc();
}
public void Configure(IApplicationBuilder app)
{
app.UseMvc();
}
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Akka.Actor;
namespace ClassicTestAkkaApp
{
public static class SystemActors
{
public static IActorRef TestFarmerActor;
public static ActorSystem ActorSystem;
}
}
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Akka.Actor;
using ClassicTestAkkaApp;
using Microsoft.AspNetCore.Hosting;
namespace TestAkkaApp
{
public class TestActorService
{
private ActorSystem _actorSystem;
public void Start()
{
//this is where you setup your actor system and other things
_actorSystem = ActorSystem.Create("TestSystem");
var host = new WebHostBuilder()
.UseContentRoot(Directory.GetCurrentDirectory())
.UseKestrel()
.UseStartup<Startup>()
.Build();
var farmerActorRef = _actorSystem.ActorOf(Props.Create<TestFarmerActor>());
SystemActors.TestFarmerActor = farmerActorRef;
SystemActors.ActorSystem= _actorSystem;
host.Run();
}
public async void Stop()
{
//this is where you stop your actor system
await _actorSystem.Terminate();
}
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Numerics;
using Akka.Actor;
using Akka.Routing;
using LanguageExt;
using static LanguageExt.Prelude;
namespace ClassicTestAkkaApp
{
public class TestFarmerActorFactorialCancelCommand
{
public TestFarmerActorFactorialCancelCommand(Guid request)
{
RequestId = request;
}
public TestFarmerActorFactorialCancelCommand() { }
public Guid RequestId { get; private set; }
}
public class TestFarmerActorFactorialCommand
{
public TestFarmerActorFactorialCommand()
{
}
public TestFarmerActorFactorialCommand(BigInteger number, Guid id)
{
RequestId = id;
Number = number;
}
public BigInteger Number { get; private set; }
public Guid RequestId { get; private set; }
}
public class TestFarmerActor : ReceiveActor, ILogReceive
{
private IActorRef _farm;
private readonly IDictionary<IActorRef, Tuple<BigInteger, ISet<Guid>>> _outstandingRequests = new Dictionary<IActorRef, Tuple<BigInteger, ISet<Guid>>>();
private readonly IDictionary<Guid, IActorRef> _requestIdToRef=new Dictionary<Guid, IActorRef>();
protected override void PreStart()
{
_farm = Context.ActorOf(Props.Create<ProductActor>().WithRouter(new RoundRobinPool(100)));
}
public TestFarmerActor()
{
Receive<TestFarmerActorFactorialCancelCommand>(cancel =>
{
Console.WriteLine("I am cancelling things");
IActorRef actorRef;
if (_requestIdToRef.TryGetValue(cancel.RequestId, out actorRef))
{
_farm.Tell(new Broadcast(new CancelProductActorRequest(actorRef)));
_outstandingRequests.Remove(actorRef);
_requestIdToRef.Remove(cancel.RequestId);
}
});
Receive<TestFarmerActorFactorialCommand>(factorialCommand =>
{
Console.WriteLine($"Hello worlds. Today I am going to calculate {factorialCommand.Number} factorial");
var outstandingRequests = new HashSet<Guid>();
var stride = factorialCommand.Number < 100 ? factorialCommand.Number : factorialCommand.Number / 100;
for (var i = new BigInteger(1); i <= factorialCommand.Number; i += stride)
{
var partitionId = Guid.NewGuid();
_farm.Tell(new ProductActorRequest(lower: i, upper: i + stride < factorialCommand.Number ? i + stride : factorialCommand.Number, partitionRef: partitionId, sender: Sender));
outstandingRequests.Add(partitionId);
}
_requestIdToRef.Add(factorialCommand.RequestId,Sender);
_outstandingRequests.Add(Sender, Tuple<BigInteger, ISet<Guid>>(1, outstandingRequests));
});
Receive<ProductActorResponse>(response => _outstandingRequests.ContainsKey(response.Sender), response =>
{
var state = _outstandingRequests[response.Sender];
if (state.Item2.Remove(response.PartitionRef))
{
state = Tuple(state.Item1 * response.Result, state.Item2);
_outstandingRequests[response.Sender] = state;
}
if (!state.Item2.Any())
{
_outstandingRequests.Remove(response.Sender);
_requestIdToRef.Remove(_requestIdToRef.FirstOrDefault(x=>x.Value.Equals(response.Sender)).Key);
response.Sender.Tell(state.Item1);
}
});
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment