-
-
Save rofr/d901fbdfd123f7fbd359 to your computer and use it in GitHub Desktop.
public class Executor : ReceiveActor | |
{ | |
readonly Kernel _kernel; | |
public Executor(Kernel kernel) | |
{ | |
_kernel = kernel; | |
Receive<Tuple<Command,ActorRef>[]>(ExecuteCommands); | |
} | |
private bool ExecuteCommands(Tuple<Command,ActorRef>[] tuples) | |
{ | |
foreach (var tuple in tuples) | |
{ | |
var result = _kernel.Execute(tuple.Item1); | |
//send a return message to the external caller | |
// will correlate with the call to Ask<>() in Prevayler.ExecuteAsync() | |
tuple.Item2.Tell(result, Context.Parent); | |
} | |
return true; | |
} | |
} |
/// <summary> | |
/// Append multiple commands accumulated during a specific time period or up | |
/// to a specific limit. | |
/// </summary> | |
public class JournalWriter : ReceiveActor | |
{ | |
private readonly IEventStoreConnection _eventStore; | |
private readonly IFormatter _formatter; | |
//number of commands at a time to journal | |
public int BatchSize = 100; | |
//or after a specific time elapsed, whichever comes first | |
public TimeSpan Interval; | |
//buffered commands waiting to be written to the journal | |
readonly List<Tuple<Command,ActorRef>> _commandBuffer = new List<Tuple<Command,ActorRef>>(); | |
//pass on the journaled commands to this actor | |
readonly ActorRef _executor; | |
public JournalWriter(ActorRef executor, int batchSize) | |
{ | |
BatchSize = batchSize; | |
_executor = executor; | |
Receive<Tuple<Command, ActorRef>>(Accept); | |
SetReceiveTimeout(Interval); | |
Receive<ReceiveTimeout>(HandleTimeout); | |
_eventStore = EventStoreConnection.Create(new IPEndPoint(IPAddress.Parse("127.0.0.1"), 1113)); | |
_eventStore.ConnectAsync().Wait(); | |
_formatter = new BinaryFormatter(); | |
} | |
public bool HandleTimeout(ReceiveTimeout _) | |
{ | |
Go(); | |
return true; | |
} | |
private void Go() | |
{ | |
if (_commandBuffer.Count > 0) | |
{ | |
//Console.WriteLine("JOURNALER: Writing {0} commands", _commandBuffer.Count); | |
_eventStore.AppendToStreamAsync("akka", ExpectedVersion.Any, | |
_commandBuffer.Select(ToEventData).ToArray()).Wait(); | |
//pass on for execution | |
_executor.Tell(_commandBuffer.ToArray()); | |
_commandBuffer.Clear(); | |
} | |
} | |
byte[] _bytes = new byte[200]; | |
private EventData ToEventData(Tuple<Command, ActorRef> arg) | |
{ | |
var id = Guid.NewGuid(); | |
//var stream = new MemoryStream(); | |
//_formatter.Serialize(stream, arg.Item1); | |
return new EventData(id, "akka", false, _bytes, null); | |
} | |
public bool Accept(Tuple<Command, ActorRef> command) | |
{ | |
_commandBuffer.Add(command); | |
if (_commandBuffer.Count == BatchSize) Go(); | |
return true; | |
} | |
protected override void PostStop() | |
{ | |
base.PostStop(); | |
_eventStore.Close(); | |
Console.WriteLine("PostStop called"); | |
} | |
} |
[Test] | |
public void Smoke(int batchSize = 100) | |
{ | |
Console.WriteLine("Batch size: " + batchSize); | |
var sw = new Stopwatch(); | |
var prevayler = new Prevayler<List<string>>(new List<string>()); | |
sw.Start(); | |
var tasks = Enumerable | |
.Range(0, 10000) | |
.Select(i => prevayler.ExecuteAsync(new AddItemCommand(i.ToString()))).ToArray(); | |
Task.WaitAll(tasks); | |
sw.Stop(); | |
Console.WriteLine("async elapsed: " + sw.Elapsed); | |
prevayler.Dispose(); | |
} | |
[Test] | |
public void ProgressiveBatchSizes() | |
{ | |
foreach (var batchSize in Enumerable.Range(0,8).Select(i => 10 * Math.Pow(2, i))) | |
{ | |
Smoke((int)batchSize); | |
} | |
} |
/// <summary> | |
/// Prevalence engine | |
/// </summary> | |
/// <typeparam name="M"></typeparam> | |
public class Prevayler<M> : IDisposable | |
{ | |
readonly ActorSystem _actorSystem; | |
readonly ActorRef _dispatcher; | |
public Prevayler(M model) | |
{ | |
// the kernel is an origodb component which | |
// synchronizes reads and writes to the model | |
// will be shared by command executor and query executor group | |
var kernel = new Kernel(model); | |
//build the chain of actors backwards | |
_actorSystem = ActorSystem.Create("prevayler"); | |
//executor executes commands | |
//it could also handle queries but would allow either a single query or command at time. | |
//better to add a group of actors that can execute queries concurrently | |
var executor = _actorSystem.ActorOf(Props.Create(() => new Executor(kernel))); | |
//journaler writes commands to the journal in batches or at specific intervals | |
//before passing to the executor | |
var journaler = _actorSystem.ActorOf(Props.Create(() => new JournalWriter(executor))); | |
//dispatcher prepares initial message and passes to journaler | |
_dispatcher = _actorSystem.ActorOf(Props.Create(() => new Dispatcher(journaler))); | |
} | |
public async Task<R> ExecuteAsync<R>(Command<M,R> command) | |
{ | |
return await _dispatcher.Ask<R>(command); | |
} | |
public async Task ExecuteAsync(Command<M> command) | |
{ | |
await _dispatcher.Ask(command); | |
} | |
public R Execute<R>(Command<M, R> command) | |
{ | |
return ExecuteAsync(command).Result; | |
} | |
public void Execute(Command<M> command) | |
{ | |
ExecuteAsync(command).Wait(); | |
} | |
public void Dispose() | |
{ | |
_actorSystem.Shutdown(); | |
_actorSystem.WaitForShutdown(); | |
} | |
} |
Batch size: 10 | |
async elapsed: 00:00:03.1852693 | |
Batch size: 20 | |
PostStop called | |
async elapsed: 00:00:01.0519764 | |
Batch size: 40 | |
PostStop called | |
async elapsed: 00:00:00.7820753 | |
Batch size: 80 | |
PostStop called | |
async elapsed: 00:00:00.8452527 | |
Batch size: 160 | |
PostStop called | |
async elapsed: 00:00:00.8752412 | |
Batch size: 320 | |
PostStop called | |
async elapsed: 00:00:00.8854354 | |
Batch size: 640 | |
PostStop called | |
[INFO][2014-10-15 11:30:53][Thread 0029][akka://prevayler/deadLetters] Message DeathWatchNotification from akka://prevayler/deadLetters to akka://prevayler/deadLetters was not delivered. 1 dead letters encountered. | |
async elapsed: 00:00:00.9683153 | |
Batch size: 1280 | |
PostStop called | |
async elapsed: 00:00:00.8868212 | |
PostStop called |
No errors or so as I can see. Some minor things only
I'd simplify Executor
.
public class Executor : ReceiveActor
{
public Executor(Kernel kernel)
{
Receive<Tuple<Command,ActorRef>[]>(tuples=>
foreach (var tuple in tuples)
{
var result = _kernel.Execute(tuple.Item1);
//send a return message to the external caller
// will correlate with the call to Ask<>() in Prevayler.ExecuteAsync()
tuple.Item2.Tell(result, Context.Parent);
}
);
}
}
You don't need to return anything from your handlers, so you can make them all return void.
Then handlers returning a bool
are used for when you need to investigate the message in your handler to see if you will handle it or not. Since you always handle the messages use the Action overload.
Instead of
Console.WriteLine("PostStop called");
insert this at top of class:
private readonly LoggingAdapter _logger = Logging.GetLogger(Context);`
// In next version you'll be able to write:
private readonly LoggingAdapter _logger = Context.GetLogger();
And then use
_logger.Debug("PostStop called");
You will not be able to reach the public fields in JournalWriter
, so if you want to set those, you have to do so thru the constructor (or by sending it a message).
Thanks for the feedback, you guys rock!
I'd also consider not waiting for the async task inside the actor (Go). Instead I'd use PipeTo
to send myself the result (or if the task doesn't have any result, use await and send myself a message) and use Become(StateWaitForEventStore, false)
to switch to a state where I only can receive the result message (with a ReceiveAny(_ => Stash.Stash())
) and then when I had the result I can call the excecutor, clear the buffer and then Stash.UnstashAll()
followed by Unbecome()
to switch back to the original state.
Only problem is that stashing comes in next nuget release (the source contains this feature).
On the other hand you don't need stashing if you want to continue to support all messages.
I would consider rethinking this bit here:
_eventStore.AppendToStreamAsync("akka", ExpectedVersion.Any,
_commandBuffer.Select(ToEventData).ToArray()).Wait();
- you probably don't want to wait in this code
- You probably want to use expected versions as if you track this in here we will guarantee idempotency for you! how badass would that be? :)
Is there anyway to get notifications of new comments here?
Looks good.
I would have used a typed class for the messages instead of the
Tuple<..>
but thats just cosmetics.In the two ExecuteAsync methods, you can remove the
async
modifier and just return the result of theAsk
to avoid creacting extra task objects.The deadLetter notifications you see is most likely due to messages arriving to actors of a system that has been closed, that is, the mailbox is already scheduled for execution when the system shuts down. and runs the mailbox too late.