Skip to content

Instantly share code, notes, and snippets.

@elexisvenator
Created July 13, 2024 18:33
Show Gist options
  • Save elexisvenator/66b7b8a2f414b3a17333e932335613e1 to your computer and use it in GitHub Desktop.
Save elexisvenator/66b7b8a2f414b3a17333e932335613e1 to your computer and use it in GitHub Desktop.
bulk quick append that handles multiple streams and events in a single query
using System.IO;
using System.Linq;
using Marten.Schema;
using Marten.Storage;
using Marten.Storage.Metadata;
using Weasel.Core;
using Weasel.Postgresql.Functions;
namespace Marten.Events.Schema;
public class BulkQuickAppendEventFunction: Function
{
private readonly EventGraph _events;
public BulkQuickAppendEventFunction(EventGraph events) : base(new DbObjectName(events.DatabaseSchemaName, "mt_bulk_quick_append_events"))
{
_events = events;
}
public override void WriteCreateStatement(Migrator migrator, TextWriter writer)
{
var streamIdType = _events.GetStreamIdDBType();
var databaseSchema = _events.DatabaseSchemaName;
var tenancyStyle = _events.TenancyStyle;
var table = new EventsTable(_events);
var metadataFunctionParams = "";
var metadataSqlArgs = "";
var maxArg = 6;
var metadataInsertRefs = "";
var metadataColumns = "";
if (table.Columns.OfType<CausationIdColumn>().Any())
{
maxArg++;
metadataFunctionParams += ", causation_ids text[]";
metadataSqlArgs += $", ${maxArg}";
metadataColumns += ", " + CausationIdColumn.ColumnName;
metadataInsertRefs += ", e." + CausationIdColumn.ColumnName;
}
if (table.Columns.OfType<CorrelationIdColumn>().Any())
{
maxArg++;
metadataFunctionParams += $", correlation_ids text[]";
metadataColumns += ", " + CorrelationIdColumn.ColumnName;
metadataInsertRefs += ", e." + CorrelationIdColumn.ColumnName;
}
if (table.Columns.OfType<HeadersColumn>().Any())
{
maxArg++;
metadataFunctionParams += ", headers text[]";
metadataColumns += ", " + HeadersColumn.ColumnName;
metadataInsertRefs += ", e." + HeadersColumn.ColumnName;
}
maxArg = tenancyStyle == TenancyStyle.Conjoined ? maxArg + 1 : maxArg;
var tenantFunctionArg = tenancyStyle == TenancyStyle.Conjoined ? ", tenant_id text" : "";
var tenantSqlRef = tenancyStyle == TenancyStyle.Conjoined ? $"${maxArg} as tenant_id," : "";
var tenantColumn = tenancyStyle == TenancyStyle.Conjoined ? "tenant_id, " : "";
writer.WriteLine(
$"""
CREATE OR REPLACE FUNCTION {Identifier}(streams {streamIdType}[], stream_types text[], event_ids uuid[], event_types text[], dotnet_types text[], bodies jsonb[]{metadataFunctionParams}{tenantFunctionArg})
RETURNS bigint[]
LANGUAGE sql
AS $function$
/* At some threshold, this really should just be a tablecopy operation */
with new_events as (
select
row_number() over (partition by stream_id) as temp_version,
stream_id,
stream_type,
event_id,
event_type,
dotnet_type,
body{metadataColumns}
from unnest($1, $2, $3, $4, $5, $6{metadataSqlArgs})
as new_events(stream_id, stream_type, event_id, event_type, dotnet_type, body{metadataColumns})
),
updated_streams as (
insert into {databaseSchema}.mt_streams (id, {tenantColumn}type, version, timestamp)
select
new_events.stream_id,
{tenantSqlRef}
new_events.stream_type,
count(1),
now()
from new_events
group by (1,2,3,5)
on conflict ({tenantColumn}id)
do update
set version = mt_streams.version + excluded.version,
timestamp = excluded.timestamp
returning id, version
),
updated_events as (
insert into {databaseSchema}.mt_events
(seq_id, id, stream_id, version, data, type, {tenantColumn}{SchemaConstants.DotNetTypeColumn}, is_archived{metadataColumns})
select
nextval('{databaseSchema}.mt_events_sequence'),
e.event_id,
s.id,
/* stream version is updated before events are written, work backwards to get the event version */
s.version + 1 - (row_number() over (partition by s.id order by temp_version desc)),
e.body,
e.event_type,
{tenantSqlRef}
e.dotnet_type,
false{metadataInsertRefs}
from new_events e
/* Left join so we can insert nulls if there is no match - this will throw an error and abort the operation */
left join updated_streams s on s.id = e.stream_id
returning seq_id
)
select array_agg(seq_id)
from updated_events
$function$;
""");
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment