Skip to content

Instantly share code, notes, and snippets.

@rofr
Last active January 5, 2017 11:30
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rofr/d901fbdfd123f7fbd359 to your computer and use it in GitHub Desktop.
Save rofr/d901fbdfd123f7fbd359 to your computer and use it in GitHub Desktop.
Spiking an async origodb engine using akka.net and EventStore v3
public class Executor : ReceiveActor
{
readonly Kernel _kernel;
public Executor(Kernel kernel)
{
_kernel = kernel;
Receive<Tuple<Command,ActorRef>[]>(ExecuteCommands);
}
private bool ExecuteCommands(Tuple<Command,ActorRef>[] tuples)
{
foreach (var tuple in tuples)
{
var result = _kernel.Execute(tuple.Item1);
//send a return message to the external caller
// will correlate with the call to Ask<>() in Prevayler.ExecuteAsync()
tuple.Item2.Tell(result, Context.Parent);
}
return true;
}
}
/// <summary>
/// Append multiple commands accumulated during a specific time period or up
/// to a specific limit.
/// </summary>
public class JournalWriter : ReceiveActor
{
private readonly IEventStoreConnection _eventStore;
private readonly IFormatter _formatter;
//number of commands at a time to journal
public int BatchSize = 100;
//or after a specific time elapsed, whichever comes first
public TimeSpan Interval;
//buffered commands waiting to be written to the journal
readonly List<Tuple<Command,ActorRef>> _commandBuffer = new List<Tuple<Command,ActorRef>>();
//pass on the journaled commands to this actor
readonly ActorRef _executor;
public JournalWriter(ActorRef executor, int batchSize)
{
BatchSize = batchSize;
_executor = executor;
Receive<Tuple<Command, ActorRef>>(Accept);
SetReceiveTimeout(Interval);
Receive<ReceiveTimeout>(HandleTimeout);
_eventStore = EventStoreConnection.Create(new IPEndPoint(IPAddress.Parse("127.0.0.1"), 1113));
_eventStore.ConnectAsync().Wait();
_formatter = new BinaryFormatter();
}
public bool HandleTimeout(ReceiveTimeout _)
{
Go();
return true;
}
private void Go()
{
if (_commandBuffer.Count > 0)
{
//Console.WriteLine("JOURNALER: Writing {0} commands", _commandBuffer.Count);
_eventStore.AppendToStreamAsync("akka", ExpectedVersion.Any,
_commandBuffer.Select(ToEventData).ToArray()).Wait();
//pass on for execution
_executor.Tell(_commandBuffer.ToArray());
_commandBuffer.Clear();
}
}
byte[] _bytes = new byte[200];
private EventData ToEventData(Tuple<Command, ActorRef> arg)
{
var id = Guid.NewGuid();
//var stream = new MemoryStream();
//_formatter.Serialize(stream, arg.Item1);
return new EventData(id, "akka", false, _bytes, null);
}
public bool Accept(Tuple<Command, ActorRef> command)
{
_commandBuffer.Add(command);
if (_commandBuffer.Count == BatchSize) Go();
return true;
}
protected override void PostStop()
{
base.PostStop();
_eventStore.Close();
Console.WriteLine("PostStop called");
}
}
[Test]
public void Smoke(int batchSize = 100)
{
Console.WriteLine("Batch size: " + batchSize);
var sw = new Stopwatch();
var prevayler = new Prevayler<List<string>>(new List<string>());
sw.Start();
var tasks = Enumerable
.Range(0, 10000)
.Select(i => prevayler.ExecuteAsync(new AddItemCommand(i.ToString()))).ToArray();
Task.WaitAll(tasks);
sw.Stop();
Console.WriteLine("async elapsed: " + sw.Elapsed);
prevayler.Dispose();
}
[Test]
public void ProgressiveBatchSizes()
{
foreach (var batchSize in Enumerable.Range(0,8).Select(i => 10 * Math.Pow(2, i)))
{
Smoke((int)batchSize);
}
}
/// <summary>
/// Prevalence engine
/// </summary>
/// <typeparam name="M"></typeparam>
public class Prevayler<M> : IDisposable
{
readonly ActorSystem _actorSystem;
readonly ActorRef _dispatcher;
public Prevayler(M model)
{
// the kernel is an origodb component which
// synchronizes reads and writes to the model
// will be shared by command executor and query executor group
var kernel = new Kernel(model);
//build the chain of actors backwards
_actorSystem = ActorSystem.Create("prevayler");
//executor executes commands
//it could also handle queries but would allow either a single query or command at time.
//better to add a group of actors that can execute queries concurrently
var executor = _actorSystem.ActorOf(Props.Create(() => new Executor(kernel)));
//journaler writes commands to the journal in batches or at specific intervals
//before passing to the executor
var journaler = _actorSystem.ActorOf(Props.Create(() => new JournalWriter(executor)));
//dispatcher prepares initial message and passes to journaler
_dispatcher = _actorSystem.ActorOf(Props.Create(() => new Dispatcher(journaler)));
}
public async Task<R> ExecuteAsync<R>(Command<M,R> command)
{
return await _dispatcher.Ask<R>(command);
}
public async Task ExecuteAsync(Command<M> command)
{
await _dispatcher.Ask(command);
}
public R Execute<R>(Command<M, R> command)
{
return ExecuteAsync(command).Result;
}
public void Execute(Command<M> command)
{
ExecuteAsync(command).Wait();
}
public void Dispose()
{
_actorSystem.Shutdown();
_actorSystem.WaitForShutdown();
}
}
Batch size: 10
async elapsed: 00:00:03.1852693
Batch size: 20
PostStop called
async elapsed: 00:00:01.0519764
Batch size: 40
PostStop called
async elapsed: 00:00:00.7820753
Batch size: 80
PostStop called
async elapsed: 00:00:00.8452527
Batch size: 160
PostStop called
async elapsed: 00:00:00.8752412
Batch size: 320
PostStop called
async elapsed: 00:00:00.8854354
Batch size: 640
PostStop called
[INFO][2014-10-15 11:30:53][Thread 0029][akka://prevayler/deadLetters] Message DeathWatchNotification from akka://prevayler/deadLetters to akka://prevayler/deadLetters was not delivered. 1 dead letters encountered.
async elapsed: 00:00:00.9683153
Batch size: 1280
PostStop called
async elapsed: 00:00:00.8868212
PostStop called
@HCanber
Copy link

HCanber commented Oct 15, 2014

I'd also consider not waiting for the async task inside the actor (Go). Instead I'd use PipeTo to send myself the result (or if the task doesn't have any result, use await and send myself a message) and use Become(StateWaitForEventStore, false) to switch to a state where I only can receive the result message (with a ReceiveAny(_ => Stash.Stash())) and then when I had the result I can call the excecutor, clear the buffer and then Stash.UnstashAll() followed by Unbecome() to switch back to the original state.

Only problem is that stashing comes in next nuget release (the source contains this feature).

@HCanber
Copy link

HCanber commented Oct 15, 2014

On the other hand you don't need stashing if you want to continue to support all messages.

@gregoryyoung
Copy link

I would consider rethinking this bit here:

            _eventStore.AppendToStreamAsync("akka", ExpectedVersion.Any,
                _commandBuffer.Select(ToEventData).ToArray()).Wait();
  1. you probably don't want to wait in this code
  2. You probably want to use expected versions as if you track this in here we will guarantee idempotency for you! how badass would that be? :)

@rofr
Copy link
Author

rofr commented Oct 16, 2014

Is there anyway to get notifications of new comments here?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment