Skip to content

Instantly share code, notes, and snippets.

@doctorpangloss
Created October 7, 2021 16:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save doctorpangloss/10f0b6fb461c99369abc15f2f4327a1f to your computer and use it in GitHub Desktop.
Save doctorpangloss/10f0b6fb461c99369abc15f2f4327a1f to your computer and use it in GitHub Desktop.
Improving semantics around gRPC on Unity
using System;
using System.Collections.Generic;
using Grpc.Core;
using UniRx;
using UniRx.Diagnostics;
using UnityEngine;
using System;
using System.Collections.Concurrent;
using System.Threading;
using Cysharp.Threading.Tasks;
using Channel = Cysharp.Threading.Tasks.Channel;
namespace HiddenSwitch.Grpc.ComponentModel
{
public static class AsyncCallExtensions
{
/// <summary>
/// The default way to convert an <see cref="AsyncUnaryCall{TResponse}"/> from the gRPC-specified server RPCs
/// to something that is easy to use in Unity3D.
/// </summary>
/// <param name="call">A call typically created by calling a <c>...Async()</c> method on
/// <see cref="DefaultClient.legacyClient"/></param>
/// <typeparam name="T">The return type.</typeparam>
/// <returns>An observable that will correctly <see cref="Subject{T}.OnError"/> the inner exception if the call
/// has an exception, <b>not</b> wrapped in <see cref="AggregateException"/>.</returns>
public static IObservable<T> ToObservable<T>(this AsyncUnaryCall<T> call)
{
return call.ResponseAsync
.ToObservable(Scheduler.MainThread)
#if UNITY_EDITOR
.DoOnError(Debug.LogError)
#endif
;
}
public static IObservable<T> AsObservable<T>(this IAsyncStreamReader<T> reader) =>
new GrpcStreamObservable<T>(reader)
.ObserveOnMainThread();
public static QueuedAsyncWriteable<T> AsQueuedWritable<T>(this IClientStreamWriter<T> streamWriter)
{
return new QueuedAsyncWriteable<T>(streamWriter);
}
}
public class GrpcStreamObservable<T> : IObservable<T>
{
private readonly IAsyncStreamReader<T> m_Reader;
private readonly CancellationToken m_Token;
private int m_Used;
protected GrpcStreamObservable()
{
}
public GrpcStreamObservable(IAsyncStreamReader<T> reader, CancellationToken token = default(CancellationToken))
{
m_Reader = reader;
m_Token = token;
m_Used = 0;
}
public IDisposable Subscribe(IObserver<T> observer) =>
Interlocked.Exchange(ref m_Used, 1) == 0
? new GrpcStreamSubscription<T>(m_Reader, observer, m_Token)
: throw new InvalidOperationException("Subscribe can only be called once.");
}
public class GrpcStreamSubscription<T> : IDisposable
{
private readonly UniTaskVoid m_Task;
private readonly CancellationTokenSource m_TokenSource;
private bool m_Completed;
public GrpcStreamSubscription(IAsyncStreamReader<T> reader, IObserver<T> observer, CancellationToken token)
{
m_TokenSource = new CancellationTokenSource();
token.Register(m_TokenSource.Cancel);
m_Task = Run(reader, observer, m_TokenSource.Token);
}
private async UniTaskVoid Run(IAsyncStreamReader<T> reader, IObserver<T> observer, CancellationToken token)
{
while (!token.IsCancellationRequested)
{
try
{
if (!await reader.MoveNext(token))
{
break;
}
}
// todo: should we really omit NotFound here?
catch (RpcException e) when (/*e.StatusCode == StatusCode.NotFound ||
*/e.StatusCode == StatusCode.Cancelled)
{
// cancelled peacefully ends with OnCompleted
break;
}
catch (OperationCanceledException)
{
break;
}
catch (Exception e)
{
observer.OnError(e);
m_Completed = true;
return;
}
observer.OnNext(reader.Current);
}
m_Completed = true;
observer.OnCompleted();
}
public void Dispose()
{
if (!m_Completed && !m_TokenSource.IsCancellationRequested)
{
m_TokenSource.Cancel();
}
m_TokenSource.Dispose();
}
}
public sealed class QueuedAsyncWriteable<T>
{
private readonly IClientStreamWriter<T> m_StreamWriter;
private volatile bool m_Busy = false;
private IProducerConsumerCollection<(T, UniTaskCompletionSource)> m_Queue =
new ConcurrentQueue<(T, UniTaskCompletionSource)>();
public QueuedAsyncWriteable(IClientStreamWriter<T> streamWriter)
{
m_StreamWriter = streamWriter;
}
public async UniTask CompleteAsync()
{
await m_StreamWriter.CompleteAsync();
}
public async UniTask WriteAsync(T message)
{
var awaitMyCompletion = new UniTaskCompletionSource();
// technically this is single-thread anyway
m_Queue.TryAdd((message, awaitMyCompletion));
// if we're not busy, send the messages
while (!m_Busy && m_Queue.TryTake(out var info))
{
m_Busy = true;
var (outgoing, existingSource) = info;
try
{
await m_StreamWriter.WriteAsync(outgoing);
existingSource.TrySetResult();
}
catch (OperationCanceledException)
{
existingSource.TrySetCanceled();
}
catch (InvalidOperationException)
{
existingSource.TrySetCanceled();
}
catch (Exception ex)
{
existingSource.TrySetException(ex);
}
finally
{
m_Busy = false;
}
}
await awaitMyCompletion.Task;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment