Skip to content

Instantly share code, notes, and snippets.

@elexisvenator
Last active March 24, 2022 22:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save elexisvenator/898dfc700f943d5ee4c3c8bb917b1370 to your computer and use it in GitHub Desktop.
Save elexisvenator/898dfc700f943d5ee4c3c8bb917b1370 to your computer and use it in GitHub Desktop.
Testing IQuerySession usage in projections

What is being tested?

This test is to determine how the querysession behaves when used as part of a create/apply method on a projection.

    public record Document([property: Identity] Guid DocumentId, string AuthorName, Guid AuthorId)
    {
        public static async Task<Document> Create(DocumentCreated @event, IQuerySession session)
        {
            var author = await session.Events.AggregateStreamAsync<User>(@event.AuthorId);
            // ^ ----------------------- here!
            Debug.Assert(author != null, nameof(author) + " != null");

            return new Document(@event.DocumentId, author.Name, @event.AuthorId);
        }

        public async Task<Document> Apply(ReferencedUserUpdated @event, IQuerySession session, Document current)
        {
            if (current.AuthorId != @event.ReferencedUserId)
            {
                return current;
            }

            var author = await session.Events.AggregateStreamAsync<User>(@event.ReferencedUserId);
            // ^ ----------------------- and here!
            Debug.Assert(author != null, nameof(author) + " != null");

            return current with { AuthorName = author.Name };
        }
    }

Scenario

3 projections:

  • Document - containing user id, author id and author name (lifecycle varies per test)
  • User - containing user id and name (lifecycle is Live)
  • DocumentsByAuthor - a reverse lookup to find all documents that reference an auther (livecycle is Inline)

All tests vary by the configured lifecycle of Document and follow these steps:

  1. Create a UserCreated event, with a random name
  2. Create a DocumentCreated event, with the auther being the user we just created
  3. Create a UserUpdatedEvent, changing the name to a new random name.
    • As part of this, use DocumentsByAuthor to find all documents that reference the user and send a ReferencedUserUpdated event for each.
  4. Query the Document projection, and check the author name.

Results

Expected

By the end of the test, querying the Document projection for the author's name should return the updated name.

Actual: Live lifecycle

Everything works as intended in the Live lifecycle. Document.AuthorName is the updated name.

Actual: Inline lifecycle

Document.AuthorName returns the original name. :(
This is because at the point that AggregateStreamAsync is called, the events are not actually written to the database yet. They are still in session.PendingChanges (which is not accessible without some nasty casting). There isnt any way for the programmer to take events from the store, append pending events from the session, then run AggregateStreamAsync on the combined list. It would be great if this was done automatically behind the scenes 🥺

Actual: Async lifecycle

Running the test will result in the daemon timing out, but if you debug then the real issue can be seen - calls to AggregateStreamAsync result in a null reference exception. The property that is null is actually the session.Connection. This seems to be a bug with query sessions in async projections. Also, this test uses conjoined multi-tenancy. While this is not essential to reproducing the issue it does highlight another problem - session.Tenant is not the tenant of the event, rather it is *DEFAULT*.

Packages used:

      <PackageReference Include="FluentAssertions" Version="6.5.1" />
      <PackageReference Include="Marten" Version="5.0.0" />
      <PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.1.0" />
      <PackageReference Include="xunit" Version="2.4.1" />
      <PackageReference Include="xunit.extensibility.core" Version="2.4.1" />
      <PackageReference Include="xunit.runner.visualstudio" Version="2.4.3">
// actual tests
using System.Diagnostics;
using Baseline.Dates;
using Marten;
using Marten.Events.Projections;
using Marten.Linq;
using Marten.Schema;
using Marten.Schema.Identity;
using Marten.Storage;
using Weasel.Core;
using Xunit;
namespace Tests;
public class QuerySessionInApply : OneOffConfigurationsContext
{
// test data
private readonly Guid _userId = CombGuidIdGeneration.NewGuid();
private readonly string _userName = Guid.NewGuid().ToString();
private readonly string _newUserName = Guid.NewGuid().ToString();
private readonly Guid _documentId = CombGuidIdGeneration.NewGuid();
private const string Tenant = "my-tenant-id";
private void ConfigureStoreWithProjectionLifecycle(ProjectionLifecycle documentProjectionLifecycle)
{
StoreOptions(options =>
{
options.Policies.AllDocumentsAreMultiTenanted();
options.Advanced.DefaultTenantUsageEnabled = true;
options.Events.TenancyStyle = TenancyStyle.Conjoined;
options.AutoCreateSchemaObjects = AutoCreate.All;
options.GeneratedCodeMode = LamarCodeGeneration.TypeLoadMode.Auto;
options.Projections.SelfAggregate<User>(ProjectionLifecycle.Live);
options.Projections.SelfAggregate<Document>(documentProjectionLifecycle);
options.Projections.Add<DocumentsByAuthor.ViewProjection>(ProjectionLifecycle.Inline);
});
}
private async Task CreateUser()
{
// create a user
await using var session = TheStore.LightweightSession(Tenant);
var userCreated = new UserCreated(_userId, _userName);
session.Events.StartStream<User>(_userId, userCreated);
await session.SaveChangesAsync();
}
private async Task CreateDoc()
{
// create a doc
await using var session = TheStore.LightweightSession(Tenant);
var documentCreated = new DocumentCreated(_documentId, _userId);
session.Events.StartStream<DocumentCreated>(_documentId, documentCreated);
await session.SaveChangesAsync();
}
private async Task UpdateUserName()
{
// change the username
// apply to document streams
await using var session = TheStore.LightweightSession(Tenant);
var updatedUser = new UserUpdated(_userId, _newUserName);
session.Events.Append(updatedUser.UserId, updatedUser);
foreach (var documentId in await session.Query<DocumentsByAuthor>().Where(a => a.AuthorId == updatedUser.UserId).Select(a => a.DocumentIds).SingleOrDefaultAsync() ?? Array.Empty<Guid>())
{
var referencedUserUpdated = new ReferencedUserUpdated(documentId, updatedUser.UserId);
session.Events.Append(documentId, referencedUserUpdated);
}
await session.SaveChangesAsync();
}
[Fact]
public async Task OnLiveProjection()
{
ConfigureStoreWithProjectionLifecycle(ProjectionLifecycle.Live);
await CreateUser();
await CreateDoc();
// get the doc
await using (var session = TheStore.QuerySession(Tenant))
{
var document = await session.Events.AggregateStreamAsync<Document>(_documentId);
Assert.NotNull(document);
Assert.Equal(_documentId, document!.DocumentId);
Assert.Equal(_userId, document.AuthorId);
Assert.Equal(_userName, document.AuthorName);
}
await UpdateUserName();
// get the doc
await using (var session = TheStore.QuerySession(Tenant))
{
var document = await session.Events.AggregateStreamAsync<Document>(_documentId);
Assert.NotNull(document);
Assert.Equal(_documentId, document!.DocumentId);
Assert.Equal(_userId, document.AuthorId);
// Because this is a live projection, the name should be the latest one
Assert.Equal(_newUserName, document.AuthorName);
}
}
[Fact]
public async Task OnInlineProjection()
{
ConfigureStoreWithProjectionLifecycle(ProjectionLifecycle.Inline);
await CreateUser();
await CreateDoc();
// get the doc
await using (var session = TheStore.QuerySession(Tenant))
{
var document = await session.Query<Document>().SingleAsync(d => d.DocumentId == _documentId, CancellationToken.None);
Assert.NotNull(document);
Assert.Equal(_documentId, document!.DocumentId);
Assert.Equal(_userId, document.AuthorId);
Assert.Equal(_userName, document.AuthorName);
}
await UpdateUserName();
// get the doc
await using (var session = TheStore.QuerySession(Tenant))
{
var document = await session.Query<Document>().SingleAsync(d => d.DocumentId == _documentId, CancellationToken.None);
Assert.NotNull(document);
Assert.Equal(_documentId, document!.DocumentId);
Assert.Equal(_userId, document.AuthorId);
// Because this is an inline projection, the updated name wouldnt have ben processed (the event is in session.PendingChanges)
// Bug: Support AggregateStreamAsync<T> in inline Apply methods also aggregating from pending changes
Assert.NotEqual(_newUserName, document.AuthorName);
// So we expect the original user name
Assert.Equal(_userName, document.AuthorName);
}
}
[Fact]
public async Task OnAsyncProjection()
{
ConfigureStoreWithProjectionLifecycle(ProjectionLifecycle.Async);
using var daemon = await TheStore.BuildProjectionDaemonAsync();
await CreateUser();
await CreateDoc();
await daemon.RebuildProjection<Document>(CancellationToken.None);
// get the doc
await using (var session = TheStore.QuerySession(Tenant))
{
var document = await session.Query<Document>().SingleAsync(d => d.DocumentId == _documentId, CancellationToken.None);
Assert.NotNull(document);
Assert.Equal(_documentId, document!.DocumentId);
Assert.Equal(_userId, document.AuthorId);
Assert.Equal(_userName, document.AuthorName);
}
await UpdateUserName();
await daemon.RebuildProjection<Document>(CancellationToken.None);
// get the doc
await using (var session = TheStore.QuerySession(Tenant))
{
var document = await session.Query<Document>().SingleAsync(d => d.DocumentId == _documentId, CancellationToken.None);
Assert.NotNull(document);
Assert.Equal(_documentId, document!.DocumentId);
Assert.Equal(_userId, document.AuthorId);
// This should work, but doesn't
// Bug: Using querysession in async projection has a null session.Connection
Assert.Equal(_newUserName, document.AuthorName);
}
}
#region Events
// User stream
public record UserCreated(Guid UserId, string Name);
public record UserUpdated(Guid UserId, string NewName);
// Document stream
public record DocumentCreated(Guid DocumentId, Guid AuthorId);
public record ReferencedUserUpdated(Guid DocumentId, Guid ReferencedUserId);
#endregion
#region Projections
public record User([property: Identity] Guid UserId, string Name)
{
public static User Create(UserCreated @event) => new User(@event.UserId, @event.Name);
public User Apply(UserUpdated @event, User current) => current with { Name = @event.NewName };
}
public record Document([property: Identity] Guid DocumentId, string AuthorName, Guid AuthorId)
{
public static async Task<Document> Create(DocumentCreated @event, IQuerySession session)
{
var author = await session.Events.AggregateStreamAsync<User>(@event.AuthorId);
Debug.Assert(author != null, nameof(author) + " != null");
return new Document(@event.DocumentId, author.Name, @event.AuthorId);
}
public async Task<Document> Apply(ReferencedUserUpdated @event, IQuerySession session, Document current)
{
if (current.AuthorId != @event.ReferencedUserId)
{
return current;
}
var author = await session.Events.AggregateStreamAsync<User>(@event.ReferencedUserId);
Debug.Assert(author != null, nameof(author) + " != null");
return current with { AuthorName = author.Name };
}
}
public record DocumentsByAuthor([property: Identity] Guid AuthorId, IReadOnlyList<Guid> DocumentIds)
{
public static DocumentsByAuthor Create(UserCreated @event) =>
new DocumentsByAuthor(@event.UserId, Array.Empty<Guid>());
public DocumentsByAuthor Apply(DocumentCreated @event, DocumentsByAuthor current) =>
current with
{
DocumentIds = new List<Guid>(current.DocumentIds) { @event.DocumentId }
};
public class ViewProjection : ViewProjection<DocumentsByAuthor, Guid>
{
public ViewProjection()
{
Identity<UserCreated>(e => e.UserId);
Identity<DocumentCreated>(e => e.AuthorId);
}
}
}
#endregion
}
// This is 99% the same file as the one in Marten unit testing
using Marten;
using Marten.Events.Projections;
using Marten.Internal.CodeGeneration;
using Npgsql;
using Weasel.Core;
using Weasel.Postgresql;
using Xunit;
namespace Tests
{
/// <summary>
/// Use this if the tests in a fixture are going to use
/// all custom StoreOptions configuration
/// </summary>
[Collection("OneOffs")]
public abstract class OneOffConfigurationsContext : IDisposable
{
private string _schemaName;
private DocumentStore? _store;
private IDocumentSession? _session;
private readonly IList<IDisposable> _disposables = new List<IDisposable>();
private readonly string _connectionString = "<SNIP>"; //TODO: get it from appsetting.json
public string SchemaName => _schemaName;
protected OneOffConfigurationsContext()
{
_schemaName = GetType().Name.ToLower().Sanitize();
}
public IList<IDisposable> Disposables => _disposables;
/// <summary>
/// This will create an additional DocumentStore with the same schema name
/// The base context will track it and dispose it later
///
/// This is meant for tests on schema detection and migrations
/// </summary>
/// <param name="configure"></param>
/// <returns></returns>
protected DocumentStore SeparateStore(Action<StoreOptions>? configure)
{
var options = new StoreOptions
{
DatabaseSchemaName = SchemaName,
};
options.Connection(_connectionString);
configure?.Invoke(options);
var store = new DocumentStore(options);
_disposables.Add(store);
return store;
}
protected void StoreOptions(Action<StoreOptions> configure, bool cleanAll = true)
{
var options = new StoreOptions();
options.Connection(_connectionString);
// Can be overridden
options.AutoCreateSchemaObjects = AutoCreate.All;
options.NameDataLength = 64;
options.DatabaseSchemaName = _schemaName;
configure(options);
if (cleanAll)
{
using (var conn = new NpgsqlConnection(_connectionString))
{
conn.Open();
conn.CreateCommand($"drop schema if exists {_schemaName} cascade")
.ExecuteNonQuery();
}
}
_store = new DocumentStore(options);
_disposables.Add(_store);
}
protected DocumentStore TheStore
{
get
{
if (_store == null)
{
StoreOptions(_ => {});
}
return _store!;
}
}
protected virtual IDocumentSession TheSession
{
get
{
if (_session == null)
{
_session = TheStore.OpenSession(DocumentTracking);
_disposables.Add(_session);
}
return _session;
}
}
/// <summary>
/// Sets the default DocumentTracking for this context. Default is "None"
/// </summary>
private DocumentTracking DocumentTracking { get; set; } = DocumentTracking.None;
public void Dispose()
{
foreach (var disposable in _disposables)
{
disposable.Dispose();
}
}
protected async Task AppendEvent(Guid streamId, params object[] events)
{
TheSession.Events.Append(streamId, events);
await TheSession.SaveChangesAsync();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment