Skip to content

Instantly share code, notes, and snippets.

@yKimisaki
Last active December 23, 2018 17:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yKimisaki/93c12b661bc1188ddf3f8ae0fe1aac46 to your computer and use it in GitHub Desktop.
Save yKimisaki/93c12b661bc1188ddf3f8ae0fe1aac46 to your computer and use it in GitHub Desktop.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Grpc.Core;
using MagicOnion;
using MagicOnion.Client;
using Minamo.Shared.Hubs;
using Minamo.Shared.Responses;
using UniRx;
using UniRx.Async;
using UnityEngine;
using UnityEngine.Networking;
using UnityEngine.UI;
namespace Minamo.Client
{
public class GeneralSample : MonoBehaviour
{
public CanvasScaler CanvasScaler;
public Text ConnectionStatus;
public Button Button;
public Transform Parent;
public Image Prefab;
private void Start()
{
var receiver = new GeneralReceiver();
receiver.AddTo(this);
var client = new GeneralClient(receiver);
client.AddTo(this);
client.IsConnected
.Select(x => x ? "Connected!!" : "Connecting...")
.SubscribeToText(this.ConnectionStatus)
.AddTo(this);
Button.OnClickAsObservable()
.Where(_ => client.IsConnected.Value)
.SelectMany(x => client.Hub.SendMessageAsync("Hello").ToObservable())
.Subscribe()
.AddTo(this);
receiver
.OnReceiveMessageAsObservable()
.Subscribe(x =>
{
var image = Instantiate(this.Prefab);
image.transform.SetParent(this.Parent.transform);
image.rectTransform.localScale = Vector3.one;
var width = this.CanvasScaler.referenceResolution.x;
var height = this.CanvasScaler.referenceResolution.y;
image.rectTransform.localPosition = new Vector2(UnityEngine.Random.Range(-width, width), UnityEngine.Random.Range(-height, height)) / 2;
})
.AddTo(this);
}
public class GeneralClient : MinamoClient<IGeneralHub, IGeneralReceiver>
{
public GeneralClient(IGeneralReceiver receiver) : base(receiver) { }
}
private class GeneralReceiver : IGeneralReceiver, IDisposable
{
private Subject<string> _receiveMessage;
public IObservable<string> OnReceiveMessageAsObservable() => this._receiveMessage.ObserveOnMainThread();
public GeneralReceiver()
{
this._receiveMessage = new Subject<string>();
}
Task IGeneralReceiver.OnReceiveMessage(IGeneralResponse response)
{
switch (response)
{
case GeneralMessageResponse messageResponse:
this._receiveMessage.OnNext(messageResponse.Message);
break;
default:
break;
}
return Task.CompletedTask;
}
void IDisposable.Dispose()
{
this._receiveMessage.Dispose();
}
}
}
public abstract class MinamoClient<TStreamingHub, TReceiver> : IConnectableClient, IDisposable where TStreamingHub : IStreamingHub<TStreamingHub, TReceiver>
{
private static ChannelConnetor _globalChannelConnector;
public TStreamingHub Hub { get; private set; }
private TReceiver _receiver;
private bool _isDisposing;
private ReactiveProperty<bool> _isConnected;
public IReadOnlyReactiveProperty<bool> IsConnected => _isConnected;
private static async UniTask CreateChannelConnectorAsync()
{
if (_globalChannelConnector != null)
{
return;
}
_globalChannelConnector = new ChannelConnetor();
await _globalChannelConnector.ConnectAsync();
Observable.OnceApplicationQuit()
.Subscribe(_ => _globalChannelConnector.DisposeAsync().Forget());
}
protected MinamoClient(TReceiver receiver)
{
this._receiver = receiver;
this._isConnected = new ReactiveProperty<bool>();
CreateChannelConnectorAsync().ConfigureAwait(PlayerLoopTiming.Update)
.ContinueWith(() => _globalChannelConnector.BindAsync(this).ConfigureAwait(PlayerLoopTiming.Update))
.Forget();
}
void IDisposable.Dispose()
{
_globalChannelConnector.UnbindAsync(this).Forget();
}
async UniTask IConnectableClient.ConnectHubAsync(Channel channel)
{
while (this._isConnected.Value)
{
if (this._isDisposing)
{
await UniTask.DelayFrame(1);
continue;
}
return;
}
this._isConnected.Value = true;
this.Hub = StreamingHubClient.Connect<TStreamingHub, TReceiver>(channel, this._receiver);
channel.RegisterStreamingSubscription(this);
}
async UniTask IConnectableClient.DisconnectHubAsync()
{
if (this._isDisposing || this.Hub == null)
{
return;
}
this._isDisposing = true;
await this.Hub.DisposeAsync().ConfigureAwait(false);
this._isConnected.Value = false;
this._isDisposing = false;
}
}
internal interface IConnectableClient : IDisposable
{
UniTask ConnectHubAsync(Channel channel);
UniTask DisconnectHubAsync();
}
internal class ChannelConnetor
{
private global::Grpc.Core.Channel _channel;
private ReactiveProperty<global::Grpc.Core.ChannelState> _channelState;
public IReadOnlyReactiveProperty<global::Grpc.Core.ChannelState> ChannelState { get; }
private HashSet<IConnectableClient> _clientList;
private bool _isDisposed;
public ChannelConnetor()
{
this._channelState = new ReactiveProperty<ChannelState>();
this.ChannelState = this._channelState.ObserveOnMainThread().ToReadOnlyReactiveProperty();
this._clientList = new HashSet<IConnectableClient>();
}
public async UniTask ConnectAsync()
{
var options = new[]
{
new global::Grpc.Core.ChannelOption("grpc.keepalive_permit_without_calls", 1),
};
this._channel = new global::Grpc.Core.Channel("localhost", 30507, ChannelCredentials.Insecure, options);
this.ConnectLoopAsync().Forget();
await this._channel.ConnectAsync().ConfigureAwait(false);
}
private async UniTask ConnectLoopAsync()
{
while (this._channel.State != global::Grpc.Core.ChannelState.Shutdown)
{
var state = this._channel.State;
if (await this._channel.TryWaitForStateChangedAsync(state).ConfigureAwait(false))
{
switch (this._channel.State)
{
case global::Grpc.Core.ChannelState.Connecting:
case global::Grpc.Core.ChannelState.Ready:
this._channelState.Value = this._channel.State;
break;
case global::Grpc.Core.ChannelState.TransientFailure:
case global::Grpc.Core.ChannelState.Idle:
this._channelState.Value = this._channel.State;
goto Retry;
default:
return;
}
}
continue;
Retry:
await UniTask.WhenAll(this._clientList
.Select(x => x.DisconnectHubAsync().ConfigureAwait(PlayerLoopTiming.Update))
.ToArray());
await this._channel.ConnectAsync().ConfigureAwait(false);
await UniTask.WhenAll(this._clientList
.Select(x => x.ConnectHubAsync(this._channel).ConfigureAwait(PlayerLoopTiming.Update))
.ToArray());
}
}
public async UniTask BindAsync(IConnectableClient client)
{
if (this._clientList.Add(client))
{
await client.ConnectHubAsync(this._channel).ConfigureAwait(PlayerLoopTiming.Update);
}
}
public async UniTask UnbindAsync(IConnectableClient client)
{
if (this._clientList.Remove(client))
{
await client.DisconnectHubAsync().ConfigureAwait(PlayerLoopTiming.Update);
}
}
public async UniTask DisposeAsync()
{
if (this._isDisposed)
{
return;
}
this._isDisposed = true;
await this._channel.ShutdownAsync().ConfigureAwait(false);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment