Skip to content

Instantly share code, notes, and snippets.

@dasjestyr
Created May 11, 2017 21:42
Show Gist options
  • Save dasjestyr/6bd4e435959ffe0fe1b65f3ced6f06c5 to your computer and use it in GitHub Desktop.
Save dasjestyr/6bd4e435959ffe0fe1b65f3ced6f06c5 to your computer and use it in GitHub Desktop.
EventStore replication
public class EventReplicationService
{
private readonly string _sourceName;
private readonly string _destinationName;
private readonly IEventStoreConnection _sourceConnection;
private readonly IEventStoreConnection _destinationConnection;
private readonly MongoRepository<ReplicationIndexCache> _mongoRepository;
private EventStoreAllCatchUpSubscription _subscription;
internal EventReplicationService(
IEventStoreConnection sourceConnection,
IEventStoreConnection destinationConnection,
IMongoDatabase mongoDatabase,
string sourceName,
string destinationName)
{
_sourceConnection = sourceConnection;
_destinationConnection = destinationConnection;
_sourceName = sourceName;
_destinationName = destinationName;
_mongoRepository = new MongoRepository<ReplicationIndexCache>(mongoDatabase, "ReplicationIndexCache");
}
public async void Start()
{
var position = await FindLastIndex();
var logMessage = position.HasValue
? $"Subscribing to all streams from {position.Value.CommitPosition} / {position.Value.PreparePosition}"
: "Subscribing to all streams from the beginning...";
Logger.Debug(logMessage, this);
_subscription = _sourceConnection.SubscribeToAllFrom(
position,
CatchUpSubscriptionSettings.Default,
EventAppeared,
subscriptionDropped: (subscription, reason, ex) => Logger.Fatal($"Subscription dropped. Reason: {reason}. {ex.StackTrace}", this, ex));
}
private async void EventAppeared(EventStoreCatchUpSubscription eventStoreCatchUpSubscription, ResolvedEvent resolvedEvent)
{
if (resolvedEvent.Event.EventType.StartsWith("$"))
return;
Logger.Debug($"(REPLICATION EVENT): {resolvedEvent.Event.EventType} -> {resolvedEvent.Event.EventStreamId}", this);
// write it to the destination
await _destinationConnection.AppendToStreamAsync(
resolvedEvent.Event.EventStreamId,
ExpectedVersion.Any,
new EventData(
resolvedEvent.Event.EventId,
resolvedEvent.Event.EventType,
resolvedEvent.Event.IsJson,
resolvedEvent.Event.Data,
resolvedEvent.Event.Metadata));
// not sure why this would ever happen
if (!resolvedEvent.OriginalPosition.HasValue)
return;
// update the cache
await SaveIndex(
resolvedEvent.OriginalPosition.Value.CommitPosition,
resolvedEvent.OriginalPosition.Value.PreparePosition);
}
public void Stop()
{
_subscription.Stop();
}
private async Task<Position?> FindLastIndex()
{
var cache = await _mongoRepository.FindAsync(c => c.Source == _sourceName && c.Destination == _destinationName).ConfigureAwait(false);
var cacheFile = cache?.SingleOrDefault();
var position = cacheFile == null
? (Position?) null
: new Position(cacheFile.Commit, cacheFile.Prepare);
return position;
}
private async Task SaveIndex(long commit, long prepare)
{
var cacheFile = new ReplicationIndexCache{ Source = _sourceName, Destination = _destinationName, Commit = commit, Prepare = prepare};
var filter =
Builders<ReplicationIndexCache>.Filter.Eq(file => file.Source, _sourceName)
& Builders<ReplicationIndexCache>.Filter.Eq(file => file.Destination, _destinationName);
await _mongoRepository.DatabaseCollection.UpdateOneAsync(
filter,
Builders<ReplicationIndexCache>.Update
.Set(file => file.Source, cacheFile.Source)
.Set(file => file.Destination, cacheFile.Destination)
.Set(file => file.Commit, cacheFile.Commit)
.Set(file => file.Prepare, cacheFile.Prepare),
new UpdateOptions {IsUpsert = true})
.ConfigureAwait(false);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment