Created
January 13, 2024 00:47
-
-
Save to11mtm/30e4aaf6619d77254b7a3d5b8188ee4e to your computer and use it in GitHub Desktop.
Nats Wrappers for Akka Streams
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using Akka; | |
using Akka.Streams.Dsl; | |
using Akka.Util; | |
using NATS.Client.Core; | |
using NATS.Client.JetStream; | |
using NATS.Client.JetStream.Models; | |
using NATSWrappers; | |
namespace GlutenFree.Akka.Streams.nats_net_v2; | |
public static partial class DSL | |
{ | |
public static Source<NatsJsWrapper<T>, NotUsed> WrappedJSFromContext<T>( | |
NatsJSContext context, | |
string stream, | |
ConsumerConfig config, | |
NatsJSConsumeOpts consumeOpts, | |
INatsSerializer<T> serializer = default, | |
NatsSubOpts? opts = default, | |
CancellationToken connectionCancellation = default, | |
CancellationToken readCancellation = default) | |
{ | |
return Source | |
.UnfoldResourceAsync<NatsJsWrapper<T>, (INatsJSConsumer, | |
IAsyncEnumerator<NatsJSMsg<T>>)>(async () => | |
{ | |
var consumer = await context.CreateOrUpdateConsumerAsync(stream, | |
config, connectionCancellation); | |
var consumerEnumerable = consumer.ConsumeAsync(serializer, | |
consumeOpts, readCancellation); | |
return (consumer, | |
consumerEnumerable.GetAsyncEnumerator(readCancellation)); | |
}, async consumer => | |
{ | |
var (c, ce) = consumer; | |
{ | |
if (await ce.MoveNextAsync()) | |
{ | |
return Option<NatsJsWrapper<T>>.Create( | |
new NatsJsWrapper<T>(ce.Current)); | |
} | |
else | |
{ | |
return Option<NatsJsWrapper<T>>.None; | |
} | |
} | |
}, async (sub) => { return Done.Instance; }); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using NATS.Client.Core; | |
using NATS.Client.JetStream; | |
using NATS.Client.KeyValueStore; | |
namespace NATSWrappers; | |
public static class Exts | |
{ | |
public static async ValueTask<CreateResult<T>> DoCreate<T>(this INatsKVStore store, string key, T value, bool writeOnDeleted = false, | |
INatsSerializer<T>? serializer = default, CancellationToken cancellationToken = default) | |
{ | |
try | |
{ | |
await store.UpdateAsync(key, value, revision: 0, serializer, cancellationToken); | |
return new CreateResult<T>(CreateResultStatus.Created, value); | |
} | |
catch (NatsKVWrongLastRevisionException) | |
{ | |
} | |
// If that fails, try to update an existing entry which may have been deleted | |
try | |
{ | |
var cv = await store.GetEntryAsync<T>(key, cancellationToken: cancellationToken); | |
return new CreateResult<T>(CreateResultStatus.AlreadyExisted, cv.Value!); | |
} | |
catch (NatsKVKeyDeletedException e) | |
{ | |
if (writeOnDeleted) | |
{ | |
await store.UpdateAsync(key, value, e.Revision, serializer, | |
cancellationToken); | |
return new CreateResult<T>(CreateResultStatus.Recreated, value); | |
} | |
else | |
{ | |
return new CreateResult<T>(CreateResultStatus.DeletedNoRecreate, | |
default); | |
} | |
} | |
throw new NatsKVCreateException(); | |
} | |
public static string? GetFirstOrNull(this NatsHeaders? headers, string key) | |
{ | |
if (headers != null && headers.TryGetValue(key, out var vals)) | |
{ | |
return vals.FirstOrDefault(key); | |
} | |
else | |
{ | |
return default; | |
} | |
} | |
} | |
public record NatsJsParentStateWrapper | |
{ | |
public NatsJsParentStateWrapper(ref NatsJSMsg<object> msg) | |
{ | |
throw new NotImplementedException(); | |
} | |
} | |
public record EventParentContext | |
{ | |
public string EventContextCreatedFrom { get; init; } | |
public ulong EventContextConsumerSequence { get; init; } | |
public NatsJSSequencePair EventContextSequence { get; init; } | |
public string EventContextConsumer { get; init; } | |
public DateTimeOffset EventContextTimeStamp { get; init; } | |
public string EventContextParentStream { get; init; } | |
public ulong EventContextStreamSequence { get; init; } | |
public string EventContextDomain { get; init; } | |
public string ParentStream { get; init; } | |
public string? EventContextWasResultOfStream { get; init; } | |
public string? EventContextWasResultOfEventSeq { get; init; } | |
} | |
public record NatsWrappingEvent<T> | |
{ | |
public T Value { get; init; } | |
public EventParentContext? Parent { get; init; } | |
} | |
public static class UpdateAction | |
{ | |
public static UpdateAction<T> Error<T>() => UpdateAction<T>.Error; | |
public static UpdateAction<T> Of<T>(T value) => UpdateAction<T>.Of(value); | |
public static UpdateAction<T> DoNothing<T>(T value) => UpdateAction<T>.DoNothing; | |
} | |
public readonly struct UpdateAction<T> | |
{ | |
public static readonly UpdateAction<T> Error = new UpdateAction<T>(0); | |
internal readonly int Code; | |
private UpdateAction(int code) | |
{ | |
Code = code; | |
Value = default!; | |
} | |
private UpdateAction(int code, T value) | |
{ | |
Code = code; | |
Value = value!; | |
} | |
public static readonly UpdateAction<T> DoNothing = | |
new UpdateAction<T>(1); | |
internal readonly T Value; | |
public static UpdateAction<T> Of(T value) | |
{ | |
return new UpdateAction<T>(2, value); | |
} | |
} | |
public readonly struct UpdateResult<TU> | |
{ | |
public readonly UpdateResultStatus Status; | |
public readonly TU Result; | |
internal UpdateResult(UpdateResultStatus status, TU result) | |
{ | |
Status = status; | |
Result = result; | |
} | |
} | |
public enum UpdateResultStatus : byte | |
{ | |
Updated, | |
DidNothing, | |
Error, | |
KeyNotFound, | |
KeyDeleted | |
} | |
public enum DoAck : byte | |
{ | |
DoAck, | |
NAck, | |
HandleAck, | |
StopRedelivery, | |
DumpToErrorQueue, | |
} | |
public sealed class NatsKVWrapper<T> | |
{ | |
internal static async ValueTask<UpdateResult<NatsWrappingEvent<T>>> UpdateValue<TState>(TState state, NatsKVContext k, string bucketName, string key, Func<TState, NatsWrappingEvent<T>,ValueTask<UpdateAction<T>>> update, EventParentContext? context) | |
{ | |
//Use Revision based update to 'optimistically' perform updates | |
var bucket = await k.CreateStoreAsync(bucketName); | |
while (true) | |
{ | |
NatsKVEntry<NatsWrappingEvent<T>> e = default; | |
try | |
{ | |
e = await bucket.GetEntryAsync<NatsWrappingEvent<T>>(key); | |
} | |
catch (NatsKVKeyNotFoundException exception) | |
{ | |
return new UpdateResult<NatsWrappingEvent<T>>( | |
UpdateResultStatus.KeyNotFound, default); | |
} | |
catch (NatsKVKeyDeletedException exception) | |
{ | |
return new UpdateResult<NatsWrappingEvent<T>>( | |
UpdateResultStatus.KeyDeleted, default); | |
} | |
{ | |
var oldVal = e.Value!; | |
var newv = await update(state,oldVal); | |
switch (newv.Code) | |
{ | |
case 0: | |
//todo: Error proper | |
return new UpdateResult<NatsWrappingEvent<T>>(UpdateResultStatus.Error,oldVal); | |
case 1: | |
return new UpdateResult<NatsWrappingEvent<T>>(UpdateResultStatus.DidNothing, oldVal); | |
case 2: | |
var forUpdate = oldVal with | |
{ | |
Parent = context, | |
Value = newv.Value | |
}; | |
try | |
{ | |
var newRev = await bucket.UpdateAsync(key, forUpdate, e.Revision); | |
return new UpdateResult<NatsWrappingEvent<T>>(UpdateResultStatus.Updated, forUpdate); | |
} | |
catch (NatsKVWrongLastRevisionException concFail) | |
{ | |
//inc counter or log | |
} | |
break; | |
} | |
} | |
} | |
await foreach (var b in bucket.WatchAsync<T>(null, null, default)) | |
{ | |
} | |
} | |
} | |
public record NatsJsWrapper<T> | |
{ | |
private EventParentContext context; | |
public NatsJsWrapper(NatsJSMsg<T> msg) | |
{ | |
if (msg.Headers != null) | |
{ | |
Msg = msg; | |
if (msg.Metadata.HasValue) | |
{ | |
var md = msg.Metadata.Value; | |
context = new EventParentContext() | |
{ | |
EventContextParentStream = md.Stream, | |
EventContextConsumer = md.Consumer, | |
EventContextSequence = md.Sequence, | |
EventContextTimeStamp = md.Timestamp, | |
EventContextConsumerSequence = md.Sequence.Consumer, | |
EventContextStreamSequence = md.Sequence.Stream, | |
EventContextDomain = md.Domain, | |
EventContextWasResultOfStream = | |
msg.Headers.GetFirstOrNull( | |
"result-of-event-stream"), | |
EventContextWasResultOfEventSeq = | |
msg.Headers.GetFirstOrNull("result-of-event-seq"), | |
EventContextCreatedFrom = msg.Subject | |
}; | |
} | |
else | |
{ | |
context = new EventParentContext() | |
{ | |
EventContextWasResultOfStream = | |
msg.Headers.GetFirstOrNull( | |
"result-of-event-stream"), | |
EventContextWasResultOfEventSeq = | |
msg.Headers.GetFirstOrNull("result-of-event-seq"), | |
EventContextCreatedFrom = msg.Subject, | |
}; | |
} | |
} | |
} | |
public NatsJSMsg<T> Msg { get; set; } | |
public async ValueTask Ack() | |
{ | |
await Msg.AckAsync(); | |
} | |
public async ValueTask NAck() | |
{ | |
await Msg.NakAsync(); | |
} | |
public async ValueTask AckError() | |
{ | |
await Msg.AckTerminateAsync(); | |
} | |
public ValueTask<UpdateResult<NatsWrappingEvent<TU>>> | |
UpdateValue<TU>(NatsKVContext k, string bucketName, string key, | |
Func<NatsWrappingEvent<TU>, ValueTask<UpdateAction<TU>>> | |
update, Func<UpdateResult<NatsWrappingEvent<TU>>, DoAck> ackOption) | |
{ | |
return UpdateValue(update, k, bucketName, key, | |
static (a, b) => a(b), ackOption); | |
} | |
public async ValueTask PublishOutput<TO>(NatsJSContext context, string subject, TO output) | |
{ | |
await context.PublishAsync(subject, | |
new NatsWrappingEvent<TO>() | |
{ Parent = this.context, Value = output }); | |
} | |
public async ValueTask<CreateResult<NatsWrappingEvent<TU>>> CreateValue<TU>(TU state, | |
NatsKVContext k, string bucketName, string key, string value) | |
{ | |
var s = await k.GetStoreAsync(bucketName); | |
var kv = await s.DoCreate(key, | |
new NatsWrappingEvent<TU>() | |
{ Value = state, Parent = context }); | |
return kv; | |
} | |
public async ValueTask<UpdateResult<NatsWrappingEvent<TU>>> UpdateValue<TState,TU>(TState state, NatsKVContext k, string bucketName, string key, Func<TState, NatsWrappingEvent<TU>,ValueTask<UpdateAction<TU>>> update, Func<UpdateResult<NatsWrappingEvent<TU>>, DoAck> ackOption) | |
{ | |
var res = await NatsKVWrapper<TU>.UpdateValue(state,k, bucketName, key, update, | |
context); | |
var dec = ackOption(res); | |
switch (dec) | |
{ | |
case DoAck.StopRedelivery: | |
await Msg.AckTerminateAsync(); | |
break; | |
case DoAck.HandleAck: | |
await Msg.AckAsync(); | |
break; | |
case DoAck.NAck: | |
await Msg.NakAsync(); | |
break; | |
case DoAck.DumpToErrorQueue: | |
await new NatsJSContext(ErrorConnection).PublishAsync( | |
$"error.processing.update.{bucketName}.{key}", "IDK"); | |
await Msg.NakAsync(); | |
break; | |
} | |
return res; | |
} | |
public NatsConnection ErrorConnection { get; } | |
} | |
public readonly struct CreateResult<T> | |
{ | |
public CreateResult(CreateResultStatus status, T result) | |
{ | |
Status = status; | |
Result = result; | |
} | |
public readonly CreateResultStatus Status; | |
public readonly T Result; | |
} | |
[Flags] | |
public enum CreateResultStatus | |
{ | |
Default = 0, | |
Created = 1, | |
AlreadyExisted = 2, | |
DeletedNoRecreate = 4, | |
Recreated = 5, | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment