Skip to content

Instantly share code, notes, and snippets.

@joliver
Created April 5, 2011 04:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save joliver/903011 to your computer and use it in GitHub Desktop.
Save joliver/903011 to your computer and use it in GitHub Desktop.
Blog--CQRS: Out of Sequence Messages and Read Models
CREATE TABLE [dbo].[Messages]
(
[AggregateId] [uniqueidentifier] NOT NULL,
[Version] [int] NOT NULL CHECK ([Version] > 0),
[Inserted] [datetime] NOT NULL DEFAULT (GETUTCDATE()),
[Headers] [varbinary](max) NOT NULL,
[Payload] [varbinary](max) NOT NULL,
CONSTRAINT [PK_Messages] PRIMARY KEY CLUSTERED ([AggregateId], [Version])
);
CREATE TABLE [dbo].[Sequence]
(
[AggregateId] [uniqueidentifier] NOT NULL,
[MinVersion] [int] NOT NULL CHECK ([MinVersion] > 0),
[MaxVersion] [int] NOT NULL CHECK ([MaxVersion] > 0),
[Messages] [smallint] NOT NULL CHECK ([Messages] >= 0),
[Cleared] [datetime] NOT NULL DEFAULT (GETUTCDATE()),
CONSTRAINT [PK_Sequence] PRIMARY KEY CLUSTERED ([AggregateId])
);
CREATE UNIQUE NONCLUSTERED INDEX [IX_Sequence] ON [dbo].[Sequence]
( [AggregateId], [MaxVersion], [Messages], [MinVersion], [Cleared] );
ALTER TABLE [dbo].[Sequence] WITH CHECK ADD CONSTRAINT [CK_SequenceVersion] CHECK (([MaxVersion]>=[MinVersion]));
INSERT
INTO [Sequence]
SELECT @AggregateId, 1, 1, 0, '2000-01-01'
WHERE NOT EXISTS ( SELECT * FROM [Sequence] WHERE [AggregateId] = @AggregateId );
INSERT
INTO [Messages]
SELECT @AggregateId, @Version, GETUTCDATE(), @Headers, @Payload
WHERE NOT EXISTS ( SELECT * FROM [Messages] WHERE [AggregateId] = @AggregateId AND [Version] = @Version )
AND EXISTS ( SELECT * FROM [Sequence] WHERE [AggregateId] = @AggregateId AND MinVersion <= @Version );
UPDATE [Sequence]
SET [MaxVersion] = CASE WHEN [MaxVersion] > @Version THEN [MaxVersion] ELSE @Version END,
[Messages] = [Messages] + 1
WHERE AggregateId = @AggregateId
AND @@ROWCOUNT > 0; /* the message insert succeeded */
UPDATE [Sequence]
SET [MaxVersion] = [MaxVersion] + 1,
[MinVersion] = [MaxVersion] + 1,
[Messages] = 0,
[Cleared] = GETUTCDATE()
WHERE [AggregateId] = @AggregateId
AND [MaxVersion] = [MinVersion] + [Messages] - 1
AND @@ROWCOUNT > 0; /* we successfully updated the sequence table because the message insert succeeded */
SELECT [Headers], [Payload]
FROM [Messages]
WHERE [AggregateId] = @AggregateId
AND [Version] < ( SELECT [MinVersion] FROM Sequence WHERE [AggregateId] = @AggregateId )
AND @@ROWCOUNT > 0 /* we're moving on to the next version */
ORDER BY [Version];
DELETE
FROM [Messages]
WHERE [AggregateId] = @AggregateId
AND [Version] < ( SELECT [MinVersion] FROM Sequence WHERE [AggregateId] = @AggregateId )
AND @@ROWCOUNT > 0; /* remove the events that were selected */
/* run messages through handlers here */
/* if message handlers succeed, commit; otherwise DB is left in a consistent state and the message is retried */
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment