Skip to content

Instantly share code, notes, and snippets.

@to11mtm
Created January 13, 2024 00:47
Show Gist options
  • Save to11mtm/30e4aaf6619d77254b7a3d5b8188ee4e to your computer and use it in GitHub Desktop.
Save to11mtm/30e4aaf6619d77254b7a3d5b8188ee4e to your computer and use it in GitHub Desktop.
Nats Wrappers for Akka Streams
using Akka;
using Akka.Streams.Dsl;
using Akka.Util;
using NATS.Client.Core;
using NATS.Client.JetStream;
using NATS.Client.JetStream.Models;
using NATSWrappers;
namespace GlutenFree.Akka.Streams.nats_net_v2;
public static partial class DSL
{
public static Source<NatsJsWrapper<T>, NotUsed> WrappedJSFromContext<T>(
NatsJSContext context,
string stream,
ConsumerConfig config,
NatsJSConsumeOpts consumeOpts,
INatsSerializer<T> serializer = default,
NatsSubOpts? opts = default,
CancellationToken connectionCancellation = default,
CancellationToken readCancellation = default)
{
return Source
.UnfoldResourceAsync<NatsJsWrapper<T>, (INatsJSConsumer,
IAsyncEnumerator<NatsJSMsg<T>>)>(async () =>
{
var consumer = await context.CreateOrUpdateConsumerAsync(stream,
config, connectionCancellation);
var consumerEnumerable = consumer.ConsumeAsync(serializer,
consumeOpts, readCancellation);
return (consumer,
consumerEnumerable.GetAsyncEnumerator(readCancellation));
}, async consumer =>
{
var (c, ce) = consumer;
{
if (await ce.MoveNextAsync())
{
return Option<NatsJsWrapper<T>>.Create(
new NatsJsWrapper<T>(ce.Current));
}
else
{
return Option<NatsJsWrapper<T>>.None;
}
}
}, async (sub) => { return Done.Instance; });
}
}
using NATS.Client.Core;
using NATS.Client.JetStream;
using NATS.Client.KeyValueStore;
namespace NATSWrappers;
public static class Exts
{
public static async ValueTask<CreateResult<T>> DoCreate<T>(this INatsKVStore store, string key, T value, bool writeOnDeleted = false,
INatsSerializer<T>? serializer = default, CancellationToken cancellationToken = default)
{
try
{
await store.UpdateAsync(key, value, revision: 0, serializer, cancellationToken);
return new CreateResult<T>(CreateResultStatus.Created, value);
}
catch (NatsKVWrongLastRevisionException)
{
}
// If that fails, try to update an existing entry which may have been deleted
try
{
var cv = await store.GetEntryAsync<T>(key, cancellationToken: cancellationToken);
return new CreateResult<T>(CreateResultStatus.AlreadyExisted, cv.Value!);
}
catch (NatsKVKeyDeletedException e)
{
if (writeOnDeleted)
{
await store.UpdateAsync(key, value, e.Revision, serializer,
cancellationToken);
return new CreateResult<T>(CreateResultStatus.Recreated, value);
}
else
{
return new CreateResult<T>(CreateResultStatus.DeletedNoRecreate,
default);
}
}
throw new NatsKVCreateException();
}
public static string? GetFirstOrNull(this NatsHeaders? headers, string key)
{
if (headers != null && headers.TryGetValue(key, out var vals))
{
return vals.FirstOrDefault(key);
}
else
{
return default;
}
}
}
public record NatsJsParentStateWrapper
{
public NatsJsParentStateWrapper(ref NatsJSMsg<object> msg)
{
throw new NotImplementedException();
}
}
public record EventParentContext
{
public string EventContextCreatedFrom { get; init; }
public ulong EventContextConsumerSequence { get; init; }
public NatsJSSequencePair EventContextSequence { get; init; }
public string EventContextConsumer { get; init; }
public DateTimeOffset EventContextTimeStamp { get; init; }
public string EventContextParentStream { get; init; }
public ulong EventContextStreamSequence { get; init; }
public string EventContextDomain { get; init; }
public string ParentStream { get; init; }
public string? EventContextWasResultOfStream { get; init; }
public string? EventContextWasResultOfEventSeq { get; init; }
}
public record NatsWrappingEvent<T>
{
public T Value { get; init; }
public EventParentContext? Parent { get; init; }
}
public static class UpdateAction
{
public static UpdateAction<T> Error<T>() => UpdateAction<T>.Error;
public static UpdateAction<T> Of<T>(T value) => UpdateAction<T>.Of(value);
public static UpdateAction<T> DoNothing<T>(T value) => UpdateAction<T>.DoNothing;
}
public readonly struct UpdateAction<T>
{
public static readonly UpdateAction<T> Error = new UpdateAction<T>(0);
internal readonly int Code;
private UpdateAction(int code)
{
Code = code;
Value = default!;
}
private UpdateAction(int code, T value)
{
Code = code;
Value = value!;
}
public static readonly UpdateAction<T> DoNothing =
new UpdateAction<T>(1);
internal readonly T Value;
public static UpdateAction<T> Of(T value)
{
return new UpdateAction<T>(2, value);
}
}
public readonly struct UpdateResult<TU>
{
public readonly UpdateResultStatus Status;
public readonly TU Result;
internal UpdateResult(UpdateResultStatus status, TU result)
{
Status = status;
Result = result;
}
}
public enum UpdateResultStatus : byte
{
Updated,
DidNothing,
Error,
KeyNotFound,
KeyDeleted
}
public enum DoAck : byte
{
DoAck,
NAck,
HandleAck,
StopRedelivery,
DumpToErrorQueue,
}
public sealed class NatsKVWrapper<T>
{
internal static async ValueTask<UpdateResult<NatsWrappingEvent<T>>> UpdateValue<TState>(TState state, NatsKVContext k, string bucketName, string key, Func<TState, NatsWrappingEvent<T>,ValueTask<UpdateAction<T>>> update, EventParentContext? context)
{
//Use Revision based update to 'optimistically' perform updates
var bucket = await k.CreateStoreAsync(bucketName);
while (true)
{
NatsKVEntry<NatsWrappingEvent<T>> e = default;
try
{
e = await bucket.GetEntryAsync<NatsWrappingEvent<T>>(key);
}
catch (NatsKVKeyNotFoundException exception)
{
return new UpdateResult<NatsWrappingEvent<T>>(
UpdateResultStatus.KeyNotFound, default);
}
catch (NatsKVKeyDeletedException exception)
{
return new UpdateResult<NatsWrappingEvent<T>>(
UpdateResultStatus.KeyDeleted, default);
}
{
var oldVal = e.Value!;
var newv = await update(state,oldVal);
switch (newv.Code)
{
case 0:
//todo: Error proper
return new UpdateResult<NatsWrappingEvent<T>>(UpdateResultStatus.Error,oldVal);
case 1:
return new UpdateResult<NatsWrappingEvent<T>>(UpdateResultStatus.DidNothing, oldVal);
case 2:
var forUpdate = oldVal with
{
Parent = context,
Value = newv.Value
};
try
{
var newRev = await bucket.UpdateAsync(key, forUpdate, e.Revision);
return new UpdateResult<NatsWrappingEvent<T>>(UpdateResultStatus.Updated, forUpdate);
}
catch (NatsKVWrongLastRevisionException concFail)
{
//inc counter or log
}
break;
}
}
}
await foreach (var b in bucket.WatchAsync<T>(null, null, default))
{
}
}
}
public record NatsJsWrapper<T>
{
private EventParentContext context;
public NatsJsWrapper(NatsJSMsg<T> msg)
{
if (msg.Headers != null)
{
Msg = msg;
if (msg.Metadata.HasValue)
{
var md = msg.Metadata.Value;
context = new EventParentContext()
{
EventContextParentStream = md.Stream,
EventContextConsumer = md.Consumer,
EventContextSequence = md.Sequence,
EventContextTimeStamp = md.Timestamp,
EventContextConsumerSequence = md.Sequence.Consumer,
EventContextStreamSequence = md.Sequence.Stream,
EventContextDomain = md.Domain,
EventContextWasResultOfStream =
msg.Headers.GetFirstOrNull(
"result-of-event-stream"),
EventContextWasResultOfEventSeq =
msg.Headers.GetFirstOrNull("result-of-event-seq"),
EventContextCreatedFrom = msg.Subject
};
}
else
{
context = new EventParentContext()
{
EventContextWasResultOfStream =
msg.Headers.GetFirstOrNull(
"result-of-event-stream"),
EventContextWasResultOfEventSeq =
msg.Headers.GetFirstOrNull("result-of-event-seq"),
EventContextCreatedFrom = msg.Subject,
};
}
}
}
public NatsJSMsg<T> Msg { get; set; }
public async ValueTask Ack()
{
await Msg.AckAsync();
}
public async ValueTask NAck()
{
await Msg.NakAsync();
}
public async ValueTask AckError()
{
await Msg.AckTerminateAsync();
}
public ValueTask<UpdateResult<NatsWrappingEvent<TU>>>
UpdateValue<TU>(NatsKVContext k, string bucketName, string key,
Func<NatsWrappingEvent<TU>, ValueTask<UpdateAction<TU>>>
update, Func<UpdateResult<NatsWrappingEvent<TU>>, DoAck> ackOption)
{
return UpdateValue(update, k, bucketName, key,
static (a, b) => a(b), ackOption);
}
public async ValueTask PublishOutput<TO>(NatsJSContext context, string subject, TO output)
{
await context.PublishAsync(subject,
new NatsWrappingEvent<TO>()
{ Parent = this.context, Value = output });
}
public async ValueTask<CreateResult<NatsWrappingEvent<TU>>> CreateValue<TU>(TU state,
NatsKVContext k, string bucketName, string key, string value)
{
var s = await k.GetStoreAsync(bucketName);
var kv = await s.DoCreate(key,
new NatsWrappingEvent<TU>()
{ Value = state, Parent = context });
return kv;
}
public async ValueTask<UpdateResult<NatsWrappingEvent<TU>>> UpdateValue<TState,TU>(TState state, NatsKVContext k, string bucketName, string key, Func<TState, NatsWrappingEvent<TU>,ValueTask<UpdateAction<TU>>> update, Func<UpdateResult<NatsWrappingEvent<TU>>, DoAck> ackOption)
{
var res = await NatsKVWrapper<TU>.UpdateValue(state,k, bucketName, key, update,
context);
var dec = ackOption(res);
switch (dec)
{
case DoAck.StopRedelivery:
await Msg.AckTerminateAsync();
break;
case DoAck.HandleAck:
await Msg.AckAsync();
break;
case DoAck.NAck:
await Msg.NakAsync();
break;
case DoAck.DumpToErrorQueue:
await new NatsJSContext(ErrorConnection).PublishAsync(
$"error.processing.update.{bucketName}.{key}", "IDK");
await Msg.NakAsync();
break;
}
return res;
}
public NatsConnection ErrorConnection { get; }
}
public readonly struct CreateResult<T>
{
public CreateResult(CreateResultStatus status, T result)
{
Status = status;
Result = result;
}
public readonly CreateResultStatus Status;
public readonly T Result;
}
[Flags]
public enum CreateResultStatus
{
Default = 0,
Created = 1,
AlreadyExisted = 2,
DeletedNoRecreate = 4,
Recreated = 5,
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment