Created
December 26, 2021 04:11
-
-
Save to11mtm/1cdaf0a3a0d47f0236889e79ce2c8b61 to your computer and use it in GitHub Desktop.
NETCORE Grpc helpers for Akka.Streams
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.Threading.Tasks; | |
using Akka; | |
using Akka.Streams; | |
using Akka.Streams.Dsl; | |
using Akka.Util; | |
using Google.Protobuf; | |
using Grpc.Core; | |
namespace GrpcAkkaStreamHelpers | |
{ | |
/// <summary> | |
/// Provides an abstraction around <see cref="TaskCompletionSource"/> Queued Streams. | |
/// </summary> | |
/// <typeparam name="T"></typeparam> | |
public class TCSWriteSinkQueue<T> | |
{ | |
public ISourceQueueWithComplete<(T, TaskCompletionSource)> queue; | |
public TCSWriteSinkQueue( | |
ISourceQueueWithComplete<(T, TaskCompletionSource)> | |
queueWithComplete) | |
{ | |
queue = queueWithComplete; | |
} | |
/// <summary> | |
/// Queues a value For write. | |
/// The task will be completed when the action completes in the stream, | |
/// Or if the write (or queue action itself) fails. | |
/// </summary> | |
/// <param name="value">The value to be enqueued</param> | |
public async Task QueueValue(T value) | |
{ | |
var promise = | |
new TaskCompletionSource(TaskCreationOptions | |
.RunContinuationsAsynchronously); | |
var result = | |
await queue.OfferAsync((value, promise)); | |
switch (result) | |
{ | |
case QueueOfferResult.Enqueued _: | |
break; | |
case QueueOfferResult.Failure f: | |
promise.TrySetException( | |
new Exception("Failed to queue item", | |
f.Cause)); | |
break; | |
case QueueOfferResult.Dropped _: | |
promise.TrySetException(new Exception( | |
$"Failed to enqueue item, the queue buffer was full")); | |
break; | |
case QueueOfferResult.QueueClosed _: | |
promise.TrySetException(new Exception( | |
"Failed to enqueue item, the queue was closed.")); | |
break; | |
} | |
await promise.Task; | |
} | |
} | |
public static class GrpcStreamHelpers | |
{ | |
/// <summary> | |
/// Treats this source as a Write stage, returning the value after it is written | |
/// For additional procession | |
/// </summary> | |
/// <param name="writer">The writer being used</param> | |
/// <param name="bufferSize">Number of elements to buffer</param> | |
/// <param name="overflowStrategy">What to do when buffer is full</param> | |
/// <typeparam name="T"></typeparam> | |
/// <returns></returns> | |
public static Source<T, ISourceQueueWithComplete<T>> AsWriteStage<T>( | |
this IClientStreamWriter<T> writer, int bufferSize, | |
OverflowStrategy overflowStrategy) where T : class => | |
Source.Queue<T>(bufferSize, overflowStrategy).SelectAsync(1, | |
async src => | |
{ | |
await writer.WriteAsync(src); | |
return src; | |
}); | |
/// <summary> | |
/// Makes a Simple TCS Completion queue in the 'optimal' state. Since this stage means nothing is left, | |
/// We just return a RunnableGraph. | |
/// </summary> | |
/// <param name="writer"></param> | |
/// <param name="bufferSize"></param> | |
/// <param name="overflowStrategy"></param> | |
/// <typeparam name="T"></typeparam> | |
/// <returns></returns> | |
public static IRunnableGraph<TCSWriteSinkQueue<T>> TCSQueueAsyncSink<T>( | |
this IAsyncStreamWriter<T> writer, int bufferSize, | |
OverflowStrategy overflowStrategy) where T : class => | |
Source | |
.Queue<(T, TaskCompletionSource)>(bufferSize, overflowStrategy) | |
.SelectAsync(1, | |
async src => | |
{ | |
var (item, tcs) = src; | |
try | |
{ | |
await writer.WriteAsync(item); | |
tcs.TrySetResult(); | |
} | |
catch (Exception e) | |
{ | |
tcs.TrySetException(e); | |
} | |
return NotUsed.Instance; | |
}).MapMaterializedValue(r => new TCSWriteSinkQueue<T>(r)) | |
.ToMaterialized(Sink.Ignore<NotUsed>(), Keep.Left); | |
public static Source<T, TCSWriteSinkQueue<T>> AsTCSWriteStage<T>( | |
this IAsyncStreamWriter<T> writer, int bufferSize, | |
OverflowStrategy overflowStrategy) where T : class => | |
Source | |
.Queue<(T, TaskCompletionSource)>(bufferSize, overflowStrategy) | |
.SelectAsync(1, | |
async src => | |
{ | |
var (item, tcs) = src; | |
try | |
{ | |
await writer.WriteAsync(item); | |
tcs.TrySetResult(); | |
} | |
catch (Exception e) | |
{ | |
tcs.TrySetException(e); | |
} | |
return item; | |
}).MapMaterializedValue(r => new TCSWriteSinkQueue<T>(r)); | |
public static Source<T, TCSWriteSinkQueue<T>> QueueWithTCSWriteStage<T>( | |
Func<T, Task> writer, int bufferSize, | |
OverflowStrategy overflowStrategy) where T : class => | |
Source | |
.Queue<(T, TaskCompletionSource)>(bufferSize, overflowStrategy) | |
.MapMaterializedValue(r => new TCSWriteSinkQueue<T>(r)) | |
.SelectAsync(1, | |
async src => | |
{ | |
var (item, tcs) = src; | |
try | |
{ | |
await writer(item); | |
tcs.SetResult(); | |
} | |
catch (Exception e) | |
{ | |
tcs.SetException(e); | |
} | |
return item; | |
}); | |
public static Sink<T, Task> AsSink<T>(this IAsyncStreamWriter<T> writer) | |
where T : class => | |
Flow.Create<T>().SelectAsync(1, async input => | |
{ | |
await writer.WriteAsync(input); | |
return NotUsed.Instance; | |
}).ToMaterialized(Sink.Ignore<NotUsed>(), Keep.Right); | |
public static Sink<TIn, Task> | |
ForEachAsync<TIn>(Func<TIn, Task> action) => Flow.Create<TIn>() | |
.SelectAsync(1, async input => | |
{ | |
await action(input); | |
return NotUsed.Instance; | |
}).ToMaterialized(Sink.Ignore<NotUsed>(), Keep.Right) | |
.Named("foreachasyncSink"); | |
public static Source<T, NotUsed> AsSource<T>( | |
this IAsyncStreamReader<T> reader) where T : class => | |
Source.UnfoldAsync(reader, async (str) => | |
{ | |
if (await str.MoveNext()) | |
{ | |
var curr = str.Current; | |
return (str, curr); | |
} | |
return Option<(IAsyncStreamReader<T>, T | |
)>.None; | |
}); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment