Created
October 24, 2015 11:09
-
-
Save DamianReeves/4115d7224851edb33504 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Text; | |
using System.Threading.Tasks; | |
namespace Akka.Persistence.Sandbox | |
{ | |
using Akka.Actor; | |
public class MyPersistenceExtension: IExtension | |
{ | |
public MyPersistenceExtension(ExtendedActorSystem system) | |
{ | |
} | |
} | |
public class MyPersistence: ExtensionIdProvider<MyPersistenceExtension> | |
{ | |
public override MyPersistenceExtension CreateExtension(ExtendedActorSystem system) | |
{ | |
return new MyPersistenceExtension(system); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Text; | |
using System.Threading.Tasks; | |
namespace Akka.Persistence.Sandbox.Journal | |
{ | |
using System.Collections.Concurrent; | |
using Akka.Persistence.Journal; | |
public class MyJournal : SyncWriteJournal | |
{ | |
private static readonly object Ack = new object(); | |
private readonly ConcurrentDictionary<string, ISet<IPersistentRepresentation>> _eventStreams = | |
new ConcurrentDictionary<string, ISet<IPersistentRepresentation>>(); | |
public override void DeleteMessagesTo(string persistenceId, long toSequenceNr, bool isPermanent) | |
{ | |
ISet<IPersistentRepresentation> eventStream; | |
if (_eventStreams.TryGetValue(persistenceId, out eventStream)) | |
{ | |
foreach (var message in eventStream.ToArray()) | |
{ | |
if (message.SequenceNr <= toSequenceNr) | |
{ | |
eventStream.Remove(message); | |
if (!isPermanent) | |
{ | |
// copy message with IsDeleted flag set | |
var copy = message.Update(message.SequenceNr, | |
message.PersistenceId, isDeleted: true, sender: message.Sender); | |
eventStream.Add(copy); | |
} | |
} | |
// messages are already stored by their sequence number order | |
else break; | |
} | |
} | |
} | |
public override Task<long> ReadHighestSequenceNrAsync(string persistenceId, long fromSequenceNr) | |
{ | |
ISet<IPersistentRepresentation> eventStream; | |
if (_eventStreams.TryGetValue(persistenceId, out eventStream)) | |
{ | |
var last = eventStream.LastOrDefault(); | |
var seqNr = last == null ? 0L : last.SequenceNr; | |
return Task.FromResult(seqNr); | |
} | |
return Task.FromResult(0L); | |
} | |
public override Task ReplayMessagesAsync(string persistenceId, long fromSequenceNr, long toSequenceNr, long max, Action<IPersistentRepresentation> replayCallback) | |
{ | |
var dispatcher = Context.System.Dispatchers.DefaultGlobalDispatcher; | |
var promise = new TaskCompletionSource<object>(); | |
dispatcher.Schedule(() => | |
{ | |
try | |
{ | |
Replay(persistenceId, fromSequenceNr, toSequenceNr, max, replayCallback); | |
promise.SetResult(Ack); | |
} | |
catch (Exception e) | |
{ | |
promise.SetException(e); | |
} | |
}); | |
return promise.Task; | |
} | |
public override void WriteMessages(IEnumerable<IPersistentRepresentation> messages) | |
{ | |
foreach (var message in messages) | |
{ | |
var list = _eventStreams.GetOrAdd(message.PersistenceId, | |
new SortedSet<IPersistentRepresentation>(PersistentComparer.Instance)); | |
list.Add(message); | |
} | |
} | |
private void Replay(string pid, long from, long to, long max, Action<IPersistentRepresentation> replay) | |
{ | |
ISet<IPersistentRepresentation> eventStream; | |
if (_eventStreams.TryGetValue(pid, out eventStream)) | |
{ | |
var s = eventStream as IEnumerable<IPersistentRepresentation>; | |
if (from > 0) s = s.Where(p => p.SequenceNr >= from); | |
if (to < int.MaxValue) s = s.Where(p => p.SequenceNr <= to); | |
if (max < eventStream.Count) s = s.Take((int)max); | |
foreach (var persistent in s) | |
{ | |
replay(persistent); | |
} | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace Akka.Persistence.Sandbox.Journal | |
{ | |
using Configuration; | |
using Akka.Persistence.TestKit.Journal; | |
using Xunit; | |
using Actor; | |
using FluentAssertions; | |
public class MyJournalSpec : JournalSpec | |
{ | |
private static readonly Config SpecConfig; | |
static MyJournalSpec() | |
{ | |
SpecConfig = ConfigurationFactory.ParseString(@" | |
akka.persistence { | |
publish-plugin-commands = on | |
journal { | |
plugin = ""akka.persistence.journal.my"" | |
my { | |
class = ""Akka.Persistence.Sandbox.Journal.MyJournal, Akka.Persistence.Sandbox"" | |
plugin-dispatcher = ""akka.actor.default-dispatcher"" | |
} | |
} | |
}"); | |
} | |
public MyJournalSpec() : base(SpecConfig, "MyJournalSpec") { } | |
[Fact] | |
public void Journal_should_not_be_null() | |
{ | |
this.Journal.Should().NotBeNull(); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace Akka.Persistence.Sandbox.Journal | |
{ | |
using System.Collections.Generic; | |
internal class PersistentComparer: Comparer<IPersistentRepresentation> | |
{ | |
public static PersistentComparer Instance = new PersistentComparer(); | |
public override int Compare(IPersistentRepresentation x, IPersistentRepresentation y) | |
{ | |
return Comparer<long>.Default.Compare(x.SequenceNr, y.SequenceNr); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment