Skip to content

Instantly share code, notes, and snippets.

@DamianReeves
Created October 24, 2015 11:09
Show Gist options
  • Save DamianReeves/4115d7224851edb33504 to your computer and use it in GitHub Desktop.
Save DamianReeves/4115d7224851edb33504 to your computer and use it in GitHub Desktop.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Akka.Persistence.Sandbox
{
using Akka.Actor;
public class MyPersistenceExtension: IExtension
{
public MyPersistenceExtension(ExtendedActorSystem system)
{
}
}
public class MyPersistence: ExtensionIdProvider<MyPersistenceExtension>
{
public override MyPersistenceExtension CreateExtension(ExtendedActorSystem system)
{
return new MyPersistenceExtension(system);
}
}
}
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace Akka.Persistence.Sandbox.Journal
{
using System.Collections.Concurrent;
using Akka.Persistence.Journal;
public class MyJournal : SyncWriteJournal
{
private static readonly object Ack = new object();
private readonly ConcurrentDictionary<string, ISet<IPersistentRepresentation>> _eventStreams =
new ConcurrentDictionary<string, ISet<IPersistentRepresentation>>();
public override void DeleteMessagesTo(string persistenceId, long toSequenceNr, bool isPermanent)
{
ISet<IPersistentRepresentation> eventStream;
if (_eventStreams.TryGetValue(persistenceId, out eventStream))
{
foreach (var message in eventStream.ToArray())
{
if (message.SequenceNr <= toSequenceNr)
{
eventStream.Remove(message);
if (!isPermanent)
{
// copy message with IsDeleted flag set
var copy = message.Update(message.SequenceNr,
message.PersistenceId, isDeleted: true, sender: message.Sender);
eventStream.Add(copy);
}
}
// messages are already stored by their sequence number order
else break;
}
}
}
public override Task<long> ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr)
{
ISet<IPersistentRepresentation> eventStream;
if (_eventStreams.TryGetValue(persistenceId, out eventStream))
{
var last = eventStream.LastOrDefault();
var seqNr = last == null ? 0L : last.SequenceNr;
return Task.FromResult(seqNr);
}
return Task.FromResult(0L);
}
public override Task ReplayMessagesAsync(string persistenceId, long fromSequenceNr, long toSequenceNr, long max, Action<IPersistentRepresentation> replayCallback)
{
var dispatcher = Context.System.Dispatchers.DefaultGlobalDispatcher;
var promise = new TaskCompletionSource<object>();
dispatcher.Schedule(() =>
{
try
{
Replay(persistenceId, fromSequenceNr, toSequenceNr, max, replayCallback);
promise.SetResult(Ack);
}
catch (Exception e)
{
promise.SetException(e);
}
});
return promise.Task;
}
public override void WriteMessages(IEnumerable<IPersistentRepresentation> messages)
{
foreach (var message in messages)
{
var list = _eventStreams.GetOrAdd(message.PersistenceId,
new SortedSet<IPersistentRepresentation>(PersistentComparer.Instance));
list.Add(message);
}
}
private void Replay(string pid, long from, long to, long max, Action<IPersistentRepresentation> replay)
{
ISet<IPersistentRepresentation> eventStream;
if (_eventStreams.TryGetValue(pid, out eventStream))
{
var s = eventStream as IEnumerable<IPersistentRepresentation>;
if (from > 0) s = s.Where(p => p.SequenceNr >= from);
if (to < int.MaxValue) s = s.Where(p => p.SequenceNr <= to);
if (max < eventStream.Count) s = s.Take((int)max);
foreach (var persistent in s)
{
replay(persistent);
}
}
}
}
}
namespace Akka.Persistence.Sandbox.Journal
{
using Configuration;
using Akka.Persistence.TestKit.Journal;
using Xunit;
using Actor;
using FluentAssertions;
public class MyJournalSpec : JournalSpec
{
private static readonly Config SpecConfig;
static MyJournalSpec()
{
SpecConfig = ConfigurationFactory.ParseString(@"
akka.persistence {
publish-plugin-commands = on
journal {
plugin = ""akka.persistence.journal.my""
my {
class = ""Akka.Persistence.Sandbox.Journal.MyJournal, Akka.Persistence.Sandbox""
plugin-dispatcher = ""akka.actor.default-dispatcher""
}
}
}");
}
public MyJournalSpec() : base(SpecConfig, "MyJournalSpec") { }
[Fact]
public void Journal_should_not_be_null()
{
this.Journal.Should().NotBeNull();
}
}
}
namespace Akka.Persistence.Sandbox.Journal
{
using System.Collections.Generic;
internal class PersistentComparer: Comparer<IPersistentRepresentation>
{
public static PersistentComparer Instance = new PersistentComparer();
public override int Compare(IPersistentRepresentation x, IPersistentRepresentation y)
{
return Comparer<long>.Default.Compare(x.SequenceNr, y.SequenceNr);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment