|
// actual tests |
|
|
|
using System.Diagnostics; |
|
using Baseline.Dates; |
|
using Marten; |
|
using Marten.Events.Projections; |
|
using Marten.Linq; |
|
using Marten.Schema; |
|
using Marten.Schema.Identity; |
|
using Marten.Storage; |
|
using Weasel.Core; |
|
using Xunit; |
|
|
|
namespace Tests; |
|
|
|
public class QuerySessionInApply : OneOffConfigurationsContext |
|
{ |
|
// test data |
|
private readonly Guid _userId = CombGuidIdGeneration.NewGuid(); |
|
private readonly string _userName = Guid.NewGuid().ToString(); |
|
private readonly string _newUserName = Guid.NewGuid().ToString(); |
|
private readonly Guid _documentId = CombGuidIdGeneration.NewGuid(); |
|
private const string Tenant = "my-tenant-id"; |
|
|
|
private void ConfigureStoreWithProjectionLifecycle(ProjectionLifecycle documentProjectionLifecycle) |
|
{ |
|
StoreOptions(options => |
|
{ |
|
options.Policies.AllDocumentsAreMultiTenanted(); |
|
options.Advanced.DefaultTenantUsageEnabled = true; |
|
options.Events.TenancyStyle = TenancyStyle.Conjoined; |
|
|
|
options.AutoCreateSchemaObjects = AutoCreate.All; |
|
options.GeneratedCodeMode = LamarCodeGeneration.TypeLoadMode.Auto; |
|
|
|
options.Projections.SelfAggregate<User>(ProjectionLifecycle.Live); |
|
options.Projections.SelfAggregate<Document>(documentProjectionLifecycle); |
|
options.Projections.Add<DocumentsByAuthor.ViewProjection>(ProjectionLifecycle.Inline); |
|
}); |
|
} |
|
|
|
private async Task CreateUser() |
|
{ |
|
// create a user |
|
await using var session = TheStore.LightweightSession(Tenant); |
|
var userCreated = new UserCreated(_userId, _userName); |
|
session.Events.StartStream<User>(_userId, userCreated); |
|
await session.SaveChangesAsync(); |
|
} |
|
|
|
private async Task CreateDoc() |
|
{ |
|
// create a doc |
|
await using var session = TheStore.LightweightSession(Tenant); |
|
var documentCreated = new DocumentCreated(_documentId, _userId); |
|
session.Events.StartStream<DocumentCreated>(_documentId, documentCreated); |
|
await session.SaveChangesAsync(); |
|
} |
|
|
|
private async Task UpdateUserName() |
|
{ |
|
// change the username |
|
// apply to document streams |
|
await using var session = TheStore.LightweightSession(Tenant); |
|
var updatedUser = new UserUpdated(_userId, _newUserName); |
|
session.Events.Append(updatedUser.UserId, updatedUser); |
|
foreach (var documentId in await session.Query<DocumentsByAuthor>().Where(a => a.AuthorId == updatedUser.UserId).Select(a => a.DocumentIds).SingleOrDefaultAsync() ?? Array.Empty<Guid>()) |
|
{ |
|
var referencedUserUpdated = new ReferencedUserUpdated(documentId, updatedUser.UserId); |
|
session.Events.Append(documentId, referencedUserUpdated); |
|
} |
|
|
|
await session.SaveChangesAsync(); |
|
} |
|
|
|
[Fact] |
|
public async Task OnLiveProjection() |
|
{ |
|
ConfigureStoreWithProjectionLifecycle(ProjectionLifecycle.Live); |
|
|
|
await CreateUser(); |
|
|
|
await CreateDoc(); |
|
|
|
// get the doc |
|
await using (var session = TheStore.QuerySession(Tenant)) |
|
{ |
|
var document = await session.Events.AggregateStreamAsync<Document>(_documentId); |
|
Assert.NotNull(document); |
|
Assert.Equal(_documentId, document!.DocumentId); |
|
Assert.Equal(_userId, document.AuthorId); |
|
Assert.Equal(_userName, document.AuthorName); |
|
} |
|
|
|
await UpdateUserName(); |
|
|
|
// get the doc |
|
await using (var session = TheStore.QuerySession(Tenant)) |
|
{ |
|
var document = await session.Events.AggregateStreamAsync<Document>(_documentId); |
|
Assert.NotNull(document); |
|
Assert.Equal(_documentId, document!.DocumentId); |
|
Assert.Equal(_userId, document.AuthorId); |
|
|
|
// Because this is a live projection, the name should be the latest one |
|
Assert.Equal(_newUserName, document.AuthorName); |
|
} |
|
} |
|
|
|
[Fact] |
|
public async Task OnInlineProjection() |
|
{ |
|
ConfigureStoreWithProjectionLifecycle(ProjectionLifecycle.Inline); |
|
|
|
await CreateUser(); |
|
|
|
await CreateDoc(); |
|
|
|
// get the doc |
|
await using (var session = TheStore.QuerySession(Tenant)) |
|
{ |
|
var document = await session.Query<Document>().SingleAsync(d => d.DocumentId == _documentId, CancellationToken.None); |
|
Assert.NotNull(document); |
|
Assert.Equal(_documentId, document!.DocumentId); |
|
Assert.Equal(_userId, document.AuthorId); |
|
Assert.Equal(_userName, document.AuthorName); |
|
} |
|
|
|
await UpdateUserName(); |
|
|
|
// get the doc |
|
await using (var session = TheStore.QuerySession(Tenant)) |
|
{ |
|
var document = await session.Query<Document>().SingleAsync(d => d.DocumentId == _documentId, CancellationToken.None); |
|
Assert.NotNull(document); |
|
Assert.Equal(_documentId, document!.DocumentId); |
|
Assert.Equal(_userId, document.AuthorId); |
|
|
|
// Because this is an inline projection, the updated name wouldnt have ben processed (the event is in session.PendingChanges) |
|
// Bug: Support AggregateStreamAsync<T> in inline Apply methods also aggregating from pending changes |
|
Assert.NotEqual(_newUserName, document.AuthorName); |
|
// So we expect the original user name |
|
Assert.Equal(_userName, document.AuthorName); |
|
} |
|
} |
|
|
|
|
|
[Fact] |
|
public async Task OnAsyncProjection() |
|
{ |
|
ConfigureStoreWithProjectionLifecycle(ProjectionLifecycle.Async); |
|
using var daemon = await TheStore.BuildProjectionDaemonAsync(); |
|
|
|
await CreateUser(); |
|
|
|
await CreateDoc(); |
|
await daemon.RebuildProjection<Document>(CancellationToken.None); |
|
|
|
// get the doc |
|
await using (var session = TheStore.QuerySession(Tenant)) |
|
{ |
|
var document = await session.Query<Document>().SingleAsync(d => d.DocumentId == _documentId, CancellationToken.None); |
|
Assert.NotNull(document); |
|
Assert.Equal(_documentId, document!.DocumentId); |
|
Assert.Equal(_userId, document.AuthorId); |
|
Assert.Equal(_userName, document.AuthorName); |
|
} |
|
|
|
await UpdateUserName(); |
|
await daemon.RebuildProjection<Document>(CancellationToken.None); |
|
|
|
// get the doc |
|
await using (var session = TheStore.QuerySession(Tenant)) |
|
{ |
|
var document = await session.Query<Document>().SingleAsync(d => d.DocumentId == _documentId, CancellationToken.None); |
|
Assert.NotNull(document); |
|
Assert.Equal(_documentId, document!.DocumentId); |
|
Assert.Equal(_userId, document.AuthorId); |
|
|
|
// This should work, but doesn't |
|
// Bug: Using querysession in async projection has a null session.Connection |
|
Assert.Equal(_newUserName, document.AuthorName); |
|
} |
|
} |
|
|
|
#region Events |
|
|
|
// User stream |
|
public record UserCreated(Guid UserId, string Name); |
|
public record UserUpdated(Guid UserId, string NewName); |
|
|
|
// Document stream |
|
public record DocumentCreated(Guid DocumentId, Guid AuthorId); |
|
public record ReferencedUserUpdated(Guid DocumentId, Guid ReferencedUserId); |
|
|
|
#endregion |
|
|
|
|
|
#region Projections |
|
|
|
public record User([property: Identity] Guid UserId, string Name) |
|
{ |
|
public static User Create(UserCreated @event) => new User(@event.UserId, @event.Name); |
|
public User Apply(UserUpdated @event, User current) => current with { Name = @event.NewName }; |
|
} |
|
|
|
public record Document([property: Identity] Guid DocumentId, string AuthorName, Guid AuthorId) |
|
{ |
|
public static async Task<Document> Create(DocumentCreated @event, IQuerySession session) |
|
{ |
|
var author = await session.Events.AggregateStreamAsync<User>(@event.AuthorId); |
|
Debug.Assert(author != null, nameof(author) + " != null"); |
|
|
|
return new Document(@event.DocumentId, author.Name, @event.AuthorId); |
|
} |
|
|
|
public async Task<Document> Apply(ReferencedUserUpdated @event, IQuerySession session, Document current) |
|
{ |
|
if (current.AuthorId != @event.ReferencedUserId) |
|
{ |
|
return current; |
|
} |
|
|
|
var author = await session.Events.AggregateStreamAsync<User>(@event.ReferencedUserId); |
|
Debug.Assert(author != null, nameof(author) + " != null"); |
|
|
|
return current with { AuthorName = author.Name }; |
|
} |
|
} |
|
|
|
public record DocumentsByAuthor([property: Identity] Guid AuthorId, IReadOnlyList<Guid> DocumentIds) |
|
{ |
|
public static DocumentsByAuthor Create(UserCreated @event) => |
|
new DocumentsByAuthor(@event.UserId, Array.Empty<Guid>()); |
|
|
|
public DocumentsByAuthor Apply(DocumentCreated @event, DocumentsByAuthor current) => |
|
current with |
|
{ |
|
DocumentIds = new List<Guid>(current.DocumentIds) { @event.DocumentId } |
|
}; |
|
|
|
public class ViewProjection : ViewProjection<DocumentsByAuthor, Guid> |
|
{ |
|
public ViewProjection() |
|
{ |
|
Identity<UserCreated>(e => e.UserId); |
|
Identity<DocumentCreated>(e => e.AuthorId); |
|
} |
|
} |
|
} |
|
|
|
#endregion |
|
|
|
|
|
} |