Created
May 11, 2017 21:42
-
-
Save dasjestyr/6bd4e435959ffe0fe1b65f3ced6f06c5 to your computer and use it in GitHub Desktop.
EventStore replication
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class EventReplicationService | |
{ | |
private readonly string _sourceName; | |
private readonly string _destinationName; | |
private readonly IEventStoreConnection _sourceConnection; | |
private readonly IEventStoreConnection _destinationConnection; | |
private readonly MongoRepository<ReplicationIndexCache> _mongoRepository; | |
private EventStoreAllCatchUpSubscription _subscription; | |
internal EventReplicationService( | |
IEventStoreConnection sourceConnection, | |
IEventStoreConnection destinationConnection, | |
IMongoDatabase mongoDatabase, | |
string sourceName, | |
string destinationName) | |
{ | |
_sourceConnection = sourceConnection; | |
_destinationConnection = destinationConnection; | |
_sourceName = sourceName; | |
_destinationName = destinationName; | |
_mongoRepository = new MongoRepository<ReplicationIndexCache>(mongoDatabase, "ReplicationIndexCache"); | |
} | |
public async void Start() | |
{ | |
var position = await FindLastIndex(); | |
var logMessage = position.HasValue | |
? $"Subscribing to all streams from {position.Value.CommitPosition} / {position.Value.PreparePosition}" | |
: "Subscribing to all streams from the beginning..."; | |
Logger.Debug(logMessage, this); | |
_subscription = _sourceConnection.SubscribeToAllFrom( | |
position, | |
CatchUpSubscriptionSettings.Default, | |
EventAppeared, | |
subscriptionDropped: (subscription, reason, ex) => Logger.Fatal($"Subscription dropped. Reason: {reason}. {ex.StackTrace}", this, ex)); | |
} | |
private async void EventAppeared(EventStoreCatchUpSubscription eventStoreCatchUpSubscription, ResolvedEvent resolvedEvent) | |
{ | |
if (resolvedEvent.Event.EventType.StartsWith("$")) | |
return; | |
Logger.Debug($"(REPLICATION EVENT): {resolvedEvent.Event.EventType} -> {resolvedEvent.Event.EventStreamId}", this); | |
// write it to the destination | |
await _destinationConnection.AppendToStreamAsync( | |
resolvedEvent.Event.EventStreamId, | |
ExpectedVersion.Any, | |
new EventData( | |
resolvedEvent.Event.EventId, | |
resolvedEvent.Event.EventType, | |
resolvedEvent.Event.IsJson, | |
resolvedEvent.Event.Data, | |
resolvedEvent.Event.Metadata)); | |
// not sure why this would ever happen | |
if (!resolvedEvent.OriginalPosition.HasValue) | |
return; | |
// update the cache | |
await SaveIndex( | |
resolvedEvent.OriginalPosition.Value.CommitPosition, | |
resolvedEvent.OriginalPosition.Value.PreparePosition); | |
} | |
public void Stop() | |
{ | |
_subscription.Stop(); | |
} | |
private async Task<Position?> FindLastIndex() | |
{ | |
var cache = await _mongoRepository.FindAsync(c => c.Source == _sourceName && c.Destination == _destinationName).ConfigureAwait(false); | |
var cacheFile = cache?.SingleOrDefault(); | |
var position = cacheFile == null | |
? (Position?) null | |
: new Position(cacheFile.Commit, cacheFile.Prepare); | |
return position; | |
} | |
private async Task SaveIndex(long commit, long prepare) | |
{ | |
var cacheFile = new ReplicationIndexCache{ Source = _sourceName, Destination = _destinationName, Commit = commit, Prepare = prepare}; | |
var filter = | |
Builders<ReplicationIndexCache>.Filter.Eq(file => file.Source, _sourceName) | |
& Builders<ReplicationIndexCache>.Filter.Eq(file => file.Destination, _destinationName); | |
await _mongoRepository.DatabaseCollection.UpdateOneAsync( | |
filter, | |
Builders<ReplicationIndexCache>.Update | |
.Set(file => file.Source, cacheFile.Source) | |
.Set(file => file.Destination, cacheFile.Destination) | |
.Set(file => file.Commit, cacheFile.Commit) | |
.Set(file => file.Prepare, cacheFile.Prepare), | |
new UpdateOptions {IsUpsert = true}) | |
.ConfigureAwait(false); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment