Skip to content

Instantly share code, notes, and snippets.

@to11mtm
Created December 26, 2021 04:11
Show Gist options
  • Save to11mtm/1cdaf0a3a0d47f0236889e79ce2c8b61 to your computer and use it in GitHub Desktop.
Save to11mtm/1cdaf0a3a0d47f0236889e79ce2c8b61 to your computer and use it in GitHub Desktop.
NETCORE Grpc helpers for Akka.Streams
using System.Threading.Tasks;
using Akka;
using Akka.Streams;
using Akka.Streams.Dsl;
using Akka.Util;
using Google.Protobuf;
using Grpc.Core;
namespace GrpcAkkaStreamHelpers
{
/// <summary>
/// Provides an abstraction around <see cref="TaskCompletionSource"/> Queued Streams.
/// </summary>
/// <typeparam name="T"></typeparam>
public class TCSWriteSinkQueue<T>
{
public ISourceQueueWithComplete<(T, TaskCompletionSource)> queue;
public TCSWriteSinkQueue(
ISourceQueueWithComplete<(T, TaskCompletionSource)>
queueWithComplete)
{
queue = queueWithComplete;
}
/// <summary>
/// Queues a value For write.
/// The task will be completed when the action completes in the stream,
/// Or if the write (or queue action itself) fails.
/// </summary>
/// <param name="value">The value to be enqueued</param>
public async Task QueueValue(T value)
{
var promise =
new TaskCompletionSource(TaskCreationOptions
.RunContinuationsAsynchronously);
var result =
await queue.OfferAsync((value, promise));
switch (result)
{
case QueueOfferResult.Enqueued _:
break;
case QueueOfferResult.Failure f:
promise.TrySetException(
new Exception("Failed to queue item",
f.Cause));
break;
case QueueOfferResult.Dropped _:
promise.TrySetException(new Exception(
$"Failed to enqueue item, the queue buffer was full"));
break;
case QueueOfferResult.QueueClosed _:
promise.TrySetException(new Exception(
"Failed to enqueue item, the queue was closed."));
break;
}
await promise.Task;
}
}
public static class GrpcStreamHelpers
{
/// <summary>
/// Treats this source as a Write stage, returning the value after it is written
/// For additional procession
/// </summary>
/// <param name="writer">The writer being used</param>
/// <param name="bufferSize">Number of elements to buffer</param>
/// <param name="overflowStrategy">What to do when buffer is full</param>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
public static Source<T, ISourceQueueWithComplete<T>> AsWriteStage<T>(
this IClientStreamWriter<T> writer, int bufferSize,
OverflowStrategy overflowStrategy) where T : class =>
Source.Queue<T>(bufferSize, overflowStrategy).SelectAsync(1,
async src =>
{
await writer.WriteAsync(src);
return src;
});
/// <summary>
/// Makes a Simple TCS Completion queue in the 'optimal' state. Since this stage means nothing is left,
/// We just return a RunnableGraph.
/// </summary>
/// <param name="writer"></param>
/// <param name="bufferSize"></param>
/// <param name="overflowStrategy"></param>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
public static IRunnableGraph<TCSWriteSinkQueue<T>> TCSQueueAsyncSink<T>(
this IAsyncStreamWriter<T> writer, int bufferSize,
OverflowStrategy overflowStrategy) where T : class =>
Source
.Queue<(T, TaskCompletionSource)>(bufferSize, overflowStrategy)
.SelectAsync(1,
async src =>
{
var (item, tcs) = src;
try
{
await writer.WriteAsync(item);
tcs.TrySetResult();
}
catch (Exception e)
{
tcs.TrySetException(e);
}
return NotUsed.Instance;
}).MapMaterializedValue(r => new TCSWriteSinkQueue<T>(r))
.ToMaterialized(Sink.Ignore<NotUsed>(), Keep.Left);
public static Source<T, TCSWriteSinkQueue<T>> AsTCSWriteStage<T>(
this IAsyncStreamWriter<T> writer, int bufferSize,
OverflowStrategy overflowStrategy) where T : class =>
Source
.Queue<(T, TaskCompletionSource)>(bufferSize, overflowStrategy)
.SelectAsync(1,
async src =>
{
var (item, tcs) = src;
try
{
await writer.WriteAsync(item);
tcs.TrySetResult();
}
catch (Exception e)
{
tcs.TrySetException(e);
}
return item;
}).MapMaterializedValue(r => new TCSWriteSinkQueue<T>(r));
public static Source<T, TCSWriteSinkQueue<T>> QueueWithTCSWriteStage<T>(
Func<T, Task> writer, int bufferSize,
OverflowStrategy overflowStrategy) where T : class =>
Source
.Queue<(T, TaskCompletionSource)>(bufferSize, overflowStrategy)
.MapMaterializedValue(r => new TCSWriteSinkQueue<T>(r))
.SelectAsync(1,
async src =>
{
var (item, tcs) = src;
try
{
await writer(item);
tcs.SetResult();
}
catch (Exception e)
{
tcs.SetException(e);
}
return item;
});
public static Sink<T, Task> AsSink<T>(this IAsyncStreamWriter<T> writer)
where T : class =>
Flow.Create<T>().SelectAsync(1, async input =>
{
await writer.WriteAsync(input);
return NotUsed.Instance;
}).ToMaterialized(Sink.Ignore<NotUsed>(), Keep.Right);
public static Sink<TIn, Task>
ForEachAsync<TIn>(Func<TIn, Task> action) => Flow.Create<TIn>()
.SelectAsync(1, async input =>
{
await action(input);
return NotUsed.Instance;
}).ToMaterialized(Sink.Ignore<NotUsed>(), Keep.Right)
.Named("foreachasyncSink");
public static Source<T, NotUsed> AsSource<T>(
this IAsyncStreamReader<T> reader) where T : class =>
Source.UnfoldAsync(reader, async (str) =>
{
if (await str.MoveNext())
{
var curr = str.Current;
return (str, curr);
}
return Option<(IAsyncStreamReader<T>, T
)>.None;
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment