Skip to content

Instantly share code, notes, and snippets.

@rofr
Last active January 5, 2017 11:30
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rofr/d901fbdfd123f7fbd359 to your computer and use it in GitHub Desktop.
Save rofr/d901fbdfd123f7fbd359 to your computer and use it in GitHub Desktop.
Spiking an async origodb engine using akka.net and EventStore v3
public class Executor : ReceiveActor
{
readonly Kernel _kernel;
public Executor(Kernel kernel)
{
_kernel = kernel;
Receive<Tuple<Command,ActorRef>[]>(ExecuteCommands);
}
private bool ExecuteCommands(Tuple<Command,ActorRef>[] tuples)
{
foreach (var tuple in tuples)
{
var result = _kernel.Execute(tuple.Item1);
//send a return message to the external caller
// will correlate with the call to Ask<>() in Prevayler.ExecuteAsync()
tuple.Item2.Tell(result, Context.Parent);
}
return true;
}
}
/// <summary>
/// Append multiple commands accumulated during a specific time period or up
/// to a specific limit.
/// </summary>
public class JournalWriter : ReceiveActor
{
private readonly IEventStoreConnection _eventStore;
private readonly IFormatter _formatter;
//number of commands at a time to journal
public int BatchSize = 100;
//or after a specific time elapsed, whichever comes first
public TimeSpan Interval;
//buffered commands waiting to be written to the journal
readonly List<Tuple<Command,ActorRef>> _commandBuffer = new List<Tuple<Command,ActorRef>>();
//pass on the journaled commands to this actor
readonly ActorRef _executor;
public JournalWriter(ActorRef executor, int batchSize)
{
BatchSize = batchSize;
_executor = executor;
Receive<Tuple<Command, ActorRef>>(Accept);
SetReceiveTimeout(Interval);
Receive<ReceiveTimeout>(HandleTimeout);
_eventStore = EventStoreConnection.Create(new IPEndPoint(IPAddress.Parse("127.0.0.1"), 1113));
_eventStore.ConnectAsync().Wait();
_formatter = new BinaryFormatter();
}
public bool HandleTimeout(ReceiveTimeout _)
{
Go();
return true;
}
private void Go()
{
if (_commandBuffer.Count > 0)
{
//Console.WriteLine("JOURNALER: Writing {0} commands", _commandBuffer.Count);
_eventStore.AppendToStreamAsync("akka", ExpectedVersion.Any,
_commandBuffer.Select(ToEventData).ToArray()).Wait();
//pass on for execution
_executor.Tell(_commandBuffer.ToArray());
_commandBuffer.Clear();
}
}
byte[] _bytes = new byte[200];
private EventData ToEventData(Tuple<Command, ActorRef> arg)
{
var id = Guid.NewGuid();
//var stream = new MemoryStream();
//_formatter.Serialize(stream, arg.Item1);
return new EventData(id, "akka", false, _bytes, null);
}
public bool Accept(Tuple<Command, ActorRef> command)
{
_commandBuffer.Add(command);
if (_commandBuffer.Count == BatchSize) Go();
return true;
}
protected override void PostStop()
{
base.PostStop();
_eventStore.Close();
Console.WriteLine("PostStop called");
}
}
[Test]
public void Smoke(int batchSize = 100)
{
Console.WriteLine("Batch size: " + batchSize);
var sw = new Stopwatch();
var prevayler = new Prevayler<List<string>>(new List<string>());
sw.Start();
var tasks = Enumerable
.Range(0, 10000)
.Select(i => prevayler.ExecuteAsync(new AddItemCommand(i.ToString()))).ToArray();
Task.WaitAll(tasks);
sw.Stop();
Console.WriteLine("async elapsed: " + sw.Elapsed);
prevayler.Dispose();
}
[Test]
public void ProgressiveBatchSizes()
{
foreach (var batchSize in Enumerable.Range(0,8).Select(i => 10 * Math.Pow(2, i)))
{
Smoke((int)batchSize);
}
}
/// <summary>
/// Prevalence engine
/// </summary>
/// <typeparam name="M"></typeparam>
public class Prevayler<M> : IDisposable
{
readonly ActorSystem _actorSystem;
readonly ActorRef _dispatcher;
public Prevayler(M model)
{
// the kernel is an origodb component which
// synchronizes reads and writes to the model
// will be shared by command executor and query executor group
var kernel = new Kernel(model);
//build the chain of actors backwards
_actorSystem = ActorSystem.Create("prevayler");
//executor executes commands
//it could also handle queries but would allow either a single query or command at time.
//better to add a group of actors that can execute queries concurrently
var executor = _actorSystem.ActorOf(Props.Create(() => new Executor(kernel)));
//journaler writes commands to the journal in batches or at specific intervals
//before passing to the executor
var journaler = _actorSystem.ActorOf(Props.Create(() => new JournalWriter(executor)));
//dispatcher prepares initial message and passes to journaler
_dispatcher = _actorSystem.ActorOf(Props.Create(() => new Dispatcher(journaler)));
}
public async Task<R> ExecuteAsync<R>(Command<M,R> command)
{
return await _dispatcher.Ask<R>(command);
}
public async Task ExecuteAsync(Command<M> command)
{
await _dispatcher.Ask(command);
}
public R Execute<R>(Command<M, R> command)
{
return ExecuteAsync(command).Result;
}
public void Execute(Command<M> command)
{
ExecuteAsync(command).Wait();
}
public void Dispose()
{
_actorSystem.Shutdown();
_actorSystem.WaitForShutdown();
}
}
Batch size: 10
async elapsed: 00:00:03.1852693
Batch size: 20
PostStop called
async elapsed: 00:00:01.0519764
Batch size: 40
PostStop called
async elapsed: 00:00:00.7820753
Batch size: 80
PostStop called
async elapsed: 00:00:00.8452527
Batch size: 160
PostStop called
async elapsed: 00:00:00.8752412
Batch size: 320
PostStop called
async elapsed: 00:00:00.8854354
Batch size: 640
PostStop called
[INFO][2014-10-15 11:30:53][Thread 0029][akka://prevayler/deadLetters] Message DeathWatchNotification from akka://prevayler/deadLetters to akka://prevayler/deadLetters was not delivered. 1 dead letters encountered.
async elapsed: 00:00:00.9683153
Batch size: 1280
PostStop called
async elapsed: 00:00:00.8868212
PostStop called
@rogeralsing
Copy link

Looks good.
I would have used a typed class for the messages instead of the Tuple<..> but thats just cosmetics.
In the two ExecuteAsync methods, you can remove the async modifier and just return the result of the Ask to avoid creacting extra task objects.

The deadLetter notifications you see is most likely due to messages arriving to actors of a system that has been closed, that is, the mailbox is already scheduled for execution when the system shuts down. and runs the mailbox too late.

@HCanber
Copy link

HCanber commented Oct 15, 2014

No errors or so as I can see. Some minor things only

I'd simplify Executor.

   public class Executor : ReceiveActor
   {
        public Executor(Kernel kernel)
        {
            Receive<Tuple<Command,ActorRef>[]>(tuples=>
              foreach (var tuple in tuples)
              {
                  var result = _kernel.Execute(tuple.Item1);

                  //send a return message to the external caller
                  // will correlate with the call to Ask<>() in Prevayler.ExecuteAsync()
                  tuple.Item2.Tell(result, Context.Parent);
               }
            );
        } 
    }

You don't need to return anything from your handlers, so you can make them all return void.
Then handlers returning a bool are used for when you need to investigate the message in your handler to see if you will handle it or not. Since you always handle the messages use the Action overload.

Instead of

Console.WriteLine("PostStop called");

insert this at top of class:

private readonly LoggingAdapter _logger = Logging.GetLogger(Context);`
// In next version you'll be able to write:
private readonly LoggingAdapter _logger = Context.GetLogger();

And then use

_logger.Debug("PostStop called");

You will not be able to reach the public fields in JournalWriter, so if you want to set those, you have to do so thru the constructor (or by sending it a message).

@rofr
Copy link
Author

rofr commented Oct 15, 2014

Thanks for the feedback, you guys rock!

@HCanber
Copy link

HCanber commented Oct 15, 2014

I'd also consider not waiting for the async task inside the actor (Go). Instead I'd use PipeTo to send myself the result (or if the task doesn't have any result, use await and send myself a message) and use Become(StateWaitForEventStore, false) to switch to a state where I only can receive the result message (with a ReceiveAny(_ => Stash.Stash())) and then when I had the result I can call the excecutor, clear the buffer and then Stash.UnstashAll() followed by Unbecome() to switch back to the original state.

Only problem is that stashing comes in next nuget release (the source contains this feature).

@HCanber
Copy link

HCanber commented Oct 15, 2014

On the other hand you don't need stashing if you want to continue to support all messages.

@gregoryyoung
Copy link

I would consider rethinking this bit here:

            _eventStore.AppendToStreamAsync("akka", ExpectedVersion.Any,
                _commandBuffer.Select(ToEventData).ToArray()).Wait();
  1. you probably don't want to wait in this code
  2. You probably want to use expected versions as if you track this in here we will guarantee idempotency for you! how badass would that be? :)

@rofr
Copy link
Author

rofr commented Oct 16, 2014

Is there anyway to get notifications of new comments here?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment