Created
January 17, 2024 21:55
-
-
Save to11mtm/e169c3ec59bfcef5253798005155d72f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.Runtime.Serialization; | |
using System.Text.Json; | |
using Microsoft.Extensions.Primitives; | |
using NATS.Client.Core; | |
using NATS.Client.JetStream; | |
using NATS.Client.JetStream.Models; | |
using NATS.Client.KeyValueStore; | |
namespace NATSWrappers; | |
//via https://stackoverflow.com/a/75795756/2937845 | |
public static class Exts | |
{ | |
public static async ValueTask<CreateResult<T>> DoCreate<T>(this INatsKVStore store, string key, T value, bool writeOnDeleted = false, | |
INatsSerializer<T>? serializer = default, CancellationToken cancellationToken = default) | |
{ | |
try | |
{ | |
await store.UpdateAsync(key, value, revision: 0, serializer, cancellationToken); | |
return new CreateResult<T>(CreateResultStatus.Created, value); | |
} | |
catch (NatsKVWrongLastRevisionException) | |
{ | |
} | |
// If that fails, try to update an existing entry which may have been deleted | |
try | |
{ | |
var cv = await store.GetEntryAsync<T>(key, cancellationToken: cancellationToken); | |
return new CreateResult<T>(CreateResultStatus.AlreadyExisted, cv.Value!); | |
} | |
catch (NatsKVKeyDeletedException e) | |
{ | |
if (writeOnDeleted) | |
{ | |
await store.UpdateAsync(key, value, e.Revision, serializer, | |
cancellationToken); | |
return new CreateResult<T>(CreateResultStatus.Recreated, value); | |
} | |
else | |
{ | |
return new CreateResult<T>(CreateResultStatus.DeletedNoRecreate, | |
default); | |
} | |
} | |
throw new Exception("Unexpected condition in Create"); | |
} | |
public static string? GetFirstOrNull(this NatsHeaders? headers, string key) | |
{ | |
if (headers != null && headers.TryGetValue(key, out var vals)) | |
{ | |
return vals.FirstOrDefault(key); | |
} | |
else | |
{ | |
return default; | |
} | |
} | |
} | |
public record NatsJsParentStateWrapper | |
{ | |
public NatsJsParentStateWrapper(ref NatsJSMsg<object> msg) | |
{ | |
throw new NotImplementedException(); | |
} | |
} | |
public record EventParentContext | |
{ | |
public string EventContextCreatedFrom { get; init; } | |
public ulong EventContextConsumerSequence { get; init; } | |
public NatsJSSequencePair EventContextSequence { get; init; } | |
public string EventContextConsumer { get; init; } | |
public DateTimeOffset EventContextTimeStamp { get; init; } | |
public string EventContextParentStream { get; init; } | |
public ulong EventContextStreamSequence { get; init; } | |
public string EventContextDomain { get; init; } | |
public string ParentStream { get; init; } | |
public string? EventContextWasResultOfStream { get; init; } | |
public string? EventContextWasResultOfEventSeq { get; init; } | |
} | |
public record NatsWrappingEvent<T> | |
{ | |
public T Value { get; init; } | |
public EventParentContext? Parent { get; init; } | |
public EventCause Cause { get; init; } | |
} | |
public record CausalityWrappedValue<T> | |
{ | |
public T Value { get; init; } | |
public EventParentContext? Parent { get; init; } | |
public EventCause Cause { get; init; } | |
public SlimOption<T> PreviousValue { get; set; } | |
} | |
public readonly struct SlimOption<T> | |
{ | |
public readonly bool HasValue; | |
public readonly T? Value; | |
public SlimOption(T value) | |
{ | |
HasValue = true; | |
Value = value; | |
} | |
public SlimOption(bool hasValue, T value) | |
{ | |
HasValue = hasValue; | |
Value = value; | |
} | |
public SlimOption() | |
{ | |
HasValue = false; | |
Value = default!; | |
} | |
} | |
public readonly record struct CauseReason( | |
CauseEnum Cause, | |
string CauseName, | |
string CauseData); | |
public record EventCause | |
{ | |
public CauseEnum Cause { get; init; } | |
public string CauseName { get; init; } | |
public string CauseData { get; init; } | |
public EventParentContext? ParentContext { get; init; } | |
} | |
public enum CauseEnum | |
{ | |
[EnumMember(Value = "Unknown")] | |
Unknown, | |
[EnumMember(Value = "Event")] | |
Event, | |
[EnumMember(Value = "Command")] | |
Command | |
} | |
public static class UpdateAction | |
{ | |
public static UpdateAction<T> Error<T>() => UpdateAction<T>.Error; | |
public static UpdateAction<T> Of<T>(T value) => UpdateAction<T>.Of(value); | |
public static UpdateAction<T> DoNothing<T>(T value) => UpdateAction<T>.DoNothing; | |
} | |
public readonly struct UpdateAction<T> | |
{ | |
public static readonly UpdateAction<T> Error = new UpdateAction<T>(0); | |
internal readonly int Code; | |
private UpdateAction(int code) | |
{ | |
Code = code; | |
Value = default!; | |
} | |
private UpdateAction(int code, T value) | |
{ | |
Code = code; | |
Value = value!; | |
} | |
public static readonly UpdateAction<T> DoNothing = | |
new UpdateAction<T>(1); | |
internal readonly T Value; | |
public static UpdateAction<T> Of(T value) | |
{ | |
return new UpdateAction<T>(2, value); | |
} | |
} | |
public readonly struct UpdateResult<TU> | |
{ | |
public readonly UpdateResultStatus Status; | |
public readonly TU Result; | |
internal UpdateResult(UpdateResultStatus status, TU result) | |
{ | |
Status = status; | |
Result = result; | |
} | |
} | |
public enum UpdateResultStatus : byte | |
{ | |
Updated, | |
DidNothing, | |
Error, | |
KeyNotFound, | |
KeyDeleted | |
} | |
public enum DoAck : byte | |
{ | |
DoAck, | |
NAck, | |
HandleAck, | |
StopRedelivery, | |
DumpToErrorQueue, | |
} | |
public static class KVExts | |
{ | |
public static ValueTask<UpdateResult<CausalityWrappedValue<T>>> | |
UpdateValue<TState, T>(this NatsKVContext context, TState state, | |
string bucketName, string key, CauseReason reason, | |
Func<TState, CausalityWrappedValue<T>, ValueTask<UpdateAction<T>>> | |
update, EventParentContext? parentContext) | |
{ | |
return NatsKVWrapper<T>.UpdateValue(state, context, bucketName, key, reason, | |
update, parentContext); | |
} | |
} | |
public sealed class NatsKVWrapper<T> | |
{ | |
internal static async ValueTask<UpdateResult<CausalityWrappedValue<T>>> | |
UpdateValue<TState>(TState state, NatsKVContext k, string bucketName, | |
string key, CauseReason reason, | |
Func<TState, CausalityWrappedValue<T>, ValueTask<UpdateAction<T>>> | |
update, EventParentContext? context) | |
{ | |
//Use Revision based update to 'optimistically' perform updates | |
var bucket = await k.CreateStoreAsync(bucketName); | |
while (true) | |
{ | |
NatsKVEntry<CausalityWrappedValue<T>> e = default; | |
try | |
{ | |
e = await bucket.GetEntryAsync<CausalityWrappedValue<T>>(key); | |
} | |
catch (NatsKVKeyNotFoundException exception) | |
{ | |
return new UpdateResult<CausalityWrappedValue<T>>( | |
UpdateResultStatus.KeyNotFound, default); | |
} | |
catch (NatsKVKeyDeletedException exception) | |
{ | |
return new UpdateResult<CausalityWrappedValue<T>>( | |
UpdateResultStatus.KeyDeleted, default); | |
} | |
{ | |
var oldVal = e.Value!; | |
var newv = await update(state, oldVal); | |
switch (newv.Code) | |
{ | |
case 0: | |
//todo: Error proper | |
return new UpdateResult<CausalityWrappedValue<T>>( | |
UpdateResultStatus.Error, oldVal); | |
case 1: | |
return new UpdateResult<CausalityWrappedValue<T>>( | |
UpdateResultStatus.DidNothing, oldVal); | |
case 2: | |
var forUpdate = oldVal with | |
{ | |
Parent = context, | |
Value = newv.Value, | |
Cause = new EventCause(){ | |
Cause = reason.Cause, | |
CauseData = reason.CauseData, | |
CauseName = reason.CauseName, | |
ParentContext = context | |
} | |
}; | |
try | |
{ | |
var newRev = | |
await bucket.UpdateAsync(key, forUpdate, | |
e.Revision); | |
return new UpdateResult<CausalityWrappedValue<T>>( | |
UpdateResultStatus.Updated, forUpdate); | |
} | |
catch (NatsKVWrongLastRevisionException concFail) | |
{ | |
//inc counter or log | |
} | |
break; | |
} | |
} | |
} | |
await foreach (var b in bucket.WatchAsync<T>(null, null, default)) | |
{ | |
} | |
} | |
} | |
public static class NatsJSContextHelpers | |
{ | |
public static async ValueTask<PubAckResponse> PublishForCommand<TCommand, TOut>( | |
this NatsJSContext ctx, string subject, TCommand command, TOut outVal, Func<TCommand,EventCause> causeProducer, EventParentContext? context) | |
{ | |
return await ctx.PublishAsync(subject, | |
new NatsWrappingEvent<TOut>() | |
{ | |
Cause = causeProducer(command), | |
Parent = context, | |
Value = outVal | |
}); | |
} | |
} | |
public record NatsJsWrapper<T> | |
{ | |
private EventParentContext context; | |
public NatsJsWrapper(NatsJSMsg<T> msg, | |
NatsConnection? errorConnection = null) | |
{ | |
ErrorConnection = errorConnection ?? msg.Connection; | |
if (msg.Headers != null) | |
{ | |
Msg = msg; | |
if (msg.Metadata.HasValue) | |
{ | |
var md = msg.Metadata.Value; | |
context = new EventParentContext() | |
{ | |
EventContextParentStream = md.Stream, | |
EventContextConsumer = md.Consumer, | |
EventContextSequence = md.Sequence, | |
EventContextTimeStamp = md.Timestamp, | |
EventContextConsumerSequence = md.Sequence.Consumer, | |
EventContextStreamSequence = md.Sequence.Stream, | |
EventContextDomain = md.Domain, | |
EventContextWasResultOfStream = | |
msg.Headers.GetFirstOrNull( | |
"result-of-event-stream"), | |
EventContextWasResultOfEventSeq = | |
msg.Headers.GetFirstOrNull("result-of-event-seq"), | |
EventContextCreatedFrom = msg.Subject | |
}; | |
} | |
else | |
{ | |
context = new EventParentContext() | |
{ | |
EventContextWasResultOfStream = | |
msg.Headers.GetFirstOrNull( | |
"result-of-event-stream"), | |
EventContextWasResultOfEventSeq = | |
msg.Headers.GetFirstOrNull("result-of-event-seq"), | |
EventContextCreatedFrom = msg.Subject, | |
}; | |
} | |
} | |
} | |
public NatsJSMsg<T> Msg { get; set; } | |
public async ValueTask Ack() | |
{ | |
await Msg.AckAsync(); | |
} | |
public async ValueTask NAck() | |
{ | |
await Msg.NakAsync(); | |
} | |
public async ValueTask AckError() | |
{ | |
await Msg.AckTerminateAsync(); | |
} | |
public ValueTask<UpdateResult<CausalityWrappedValue<TU>>> | |
UpdateValue<TU>(NatsKVContext k, string bucketName, string key, | |
Func<CausalityWrappedValue<TU>, ValueTask<UpdateAction<TU>>> | |
update, | |
Func<UpdateResult<CausalityWrappedValue<TU>>, DoAck> ackOption) | |
{ | |
return UpdateValue(update, k, bucketName, key, | |
static (a, b) => a(b), ackOption); | |
} | |
public async ValueTask<PubAckResponse> PublishOutput<TO>( | |
NatsJSContext context, string subject, TO output) | |
{ | |
return await context.PublishAsync(subject, | |
new NatsWrappingEvent<TO>() | |
{ Parent = this.context, Value = output }); | |
} | |
public async ValueTask<CreateResult<CausalityWrappedValue<TU>>> | |
CreateValue<TU>(TU state, | |
NatsKVContext k, string bucketName, string key, string value, | |
EventCause cause) | |
{ | |
var s = await k.GetStoreAsync(bucketName); | |
var kv = await s.DoCreate(key, | |
new CausalityWrappedValue<TU>() | |
{ Value = state, Parent = context, PreviousValue = default, Cause = cause}); | |
return kv; | |
} | |
public async ValueTask<UpdateResult<CausalityWrappedValue<TU>>> UpdateValue< | |
TState, TU>( | |
TState state, | |
NatsKVContext k, | |
string bucketName, | |
string key, | |
Func<TState, CausalityWrappedValue<TU>, ValueTask<UpdateAction<TU>>> | |
update, | |
Func<UpdateResult<CausalityWrappedValue<TU>>, DoAck> ackOption, | |
string? causeType = null, | |
string? causeData = null | |
) | |
{ | |
var res = await NatsKVWrapper<TU>.UpdateValue(state, k, bucketName, key, | |
new CauseReason(CauseEnum.Event, causeType??Msg.Data?.GetType().FullName??string.Empty,causeData??string.Empty) | |
, | |
update, | |
context); | |
var dec = ackOption(res); | |
switch (dec) | |
{ | |
case DoAck.StopRedelivery: | |
await Msg.AckTerminateAsync(); | |
break; | |
case DoAck.HandleAck: | |
await Msg.AckAsync(); | |
break; | |
case DoAck.NAck: | |
await Msg.NakAsync(); | |
break; | |
case DoAck.DumpToErrorQueue: | |
await new NatsJSContext((NatsConnection)ErrorConnection!) | |
.PublishAsync( | |
$"error.processing.update.{bucketName}.{key}", "IDK"); | |
await Msg.NakAsync(); | |
break; | |
} | |
return res; | |
} | |
public INatsConnection? ErrorConnection { get; } | |
} | |
public readonly struct CreateResult<T> | |
{ | |
public CreateResult(CreateResultStatus status, T result) | |
{ | |
Status = status; | |
Result = result; | |
} | |
public readonly CreateResultStatus Status; | |
public readonly T Result; | |
} | |
[Flags] | |
public enum CreateResultStatus | |
{ | |
Default = 0, | |
Created = 1, | |
AlreadyExisted = 2, | |
DeletedNoRecreate = 4, | |
Recreated = 5, | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment