-
-
Save yKimisaki/93c12b661bc1188ddf3f8ae0fe1aac46 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Linq; | |
using System.Threading.Tasks; | |
using Grpc.Core; | |
using MagicOnion; | |
using MagicOnion.Client; | |
using Minamo.Shared.Hubs; | |
using Minamo.Shared.Responses; | |
using UniRx; | |
using UniRx.Async; | |
using UnityEngine; | |
using UnityEngine.Networking; | |
using UnityEngine.UI; | |
namespace Minamo.Client | |
{ | |
public class GeneralSample : MonoBehaviour | |
{ | |
public CanvasScaler CanvasScaler; | |
public Text ConnectionStatus; | |
public Button Button; | |
public Transform Parent; | |
public Image Prefab; | |
private void Start() | |
{ | |
var receiver = new GeneralReceiver(); | |
receiver.AddTo(this); | |
var client = new GeneralClient(receiver); | |
client.AddTo(this); | |
client.IsConnected | |
.Select(x => x ? "Connected!!" : "Connecting...") | |
.SubscribeToText(this.ConnectionStatus) | |
.AddTo(this); | |
Button.OnClickAsObservable() | |
.Where(_ => client.IsConnected.Value) | |
.SelectMany(x => client.Hub.SendMessageAsync("Hello").ToObservable()) | |
.Subscribe() | |
.AddTo(this); | |
receiver | |
.OnReceiveMessageAsObservable() | |
.Subscribe(x => | |
{ | |
var image = Instantiate(this.Prefab); | |
image.transform.SetParent(this.Parent.transform); | |
image.rectTransform.localScale = Vector3.one; | |
var width = this.CanvasScaler.referenceResolution.x; | |
var height = this.CanvasScaler.referenceResolution.y; | |
image.rectTransform.localPosition = new Vector2(UnityEngine.Random.Range(-width, width), UnityEngine.Random.Range(-height, height)) / 2; | |
}) | |
.AddTo(this); | |
} | |
public class GeneralClient : MinamoClient<IGeneralHub, IGeneralReceiver> | |
{ | |
public GeneralClient(IGeneralReceiver receiver) : base(receiver) { } | |
} | |
private class GeneralReceiver : IGeneralReceiver, IDisposable | |
{ | |
private Subject<string> _receiveMessage; | |
public IObservable<string> OnReceiveMessageAsObservable() => this._receiveMessage.ObserveOnMainThread(); | |
public GeneralReceiver() | |
{ | |
this._receiveMessage = new Subject<string>(); | |
} | |
Task IGeneralReceiver.OnReceiveMessage(IGeneralResponse response) | |
{ | |
switch (response) | |
{ | |
case GeneralMessageResponse messageResponse: | |
this._receiveMessage.OnNext(messageResponse.Message); | |
break; | |
default: | |
break; | |
} | |
return Task.CompletedTask; | |
} | |
void IDisposable.Dispose() | |
{ | |
this._receiveMessage.Dispose(); | |
} | |
} | |
} | |
public abstract class MinamoClient<TStreamingHub, TReceiver> : IConnectableClient, IDisposable where TStreamingHub : IStreamingHub<TStreamingHub, TReceiver> | |
{ | |
private static ChannelConnetor _globalChannelConnector; | |
public TStreamingHub Hub { get; private set; } | |
private TReceiver _receiver; | |
private bool _isDisposing; | |
private ReactiveProperty<bool> _isConnected; | |
public IReadOnlyReactiveProperty<bool> IsConnected => _isConnected; | |
private static async UniTask CreateChannelConnectorAsync() | |
{ | |
if (_globalChannelConnector != null) | |
{ | |
return; | |
} | |
_globalChannelConnector = new ChannelConnetor(); | |
await _globalChannelConnector.ConnectAsync(); | |
Observable.OnceApplicationQuit() | |
.Subscribe(_ => _globalChannelConnector.DisposeAsync().Forget()); | |
} | |
protected MinamoClient(TReceiver receiver) | |
{ | |
this._receiver = receiver; | |
this._isConnected = new ReactiveProperty<bool>(); | |
CreateChannelConnectorAsync().ConfigureAwait(PlayerLoopTiming.Update) | |
.ContinueWith(() => _globalChannelConnector.BindAsync(this).ConfigureAwait(PlayerLoopTiming.Update)) | |
.Forget(); | |
} | |
void IDisposable.Dispose() | |
{ | |
_globalChannelConnector.UnbindAsync(this).Forget(); | |
} | |
async UniTask IConnectableClient.ConnectHubAsync(Channel channel) | |
{ | |
while (this._isConnected.Value) | |
{ | |
if (this._isDisposing) | |
{ | |
await UniTask.DelayFrame(1); | |
continue; | |
} | |
return; | |
} | |
this._isConnected.Value = true; | |
this.Hub = StreamingHubClient.Connect<TStreamingHub, TReceiver>(channel, this._receiver); | |
channel.RegisterStreamingSubscription(this); | |
} | |
async UniTask IConnectableClient.DisconnectHubAsync() | |
{ | |
if (this._isDisposing || this.Hub == null) | |
{ | |
return; | |
} | |
this._isDisposing = true; | |
await this.Hub.DisposeAsync().ConfigureAwait(false); | |
this._isConnected.Value = false; | |
this._isDisposing = false; | |
} | |
} | |
internal interface IConnectableClient : IDisposable | |
{ | |
UniTask ConnectHubAsync(Channel channel); | |
UniTask DisconnectHubAsync(); | |
} | |
internal class ChannelConnetor | |
{ | |
private global::Grpc.Core.Channel _channel; | |
private ReactiveProperty<global::Grpc.Core.ChannelState> _channelState; | |
public IReadOnlyReactiveProperty<global::Grpc.Core.ChannelState> ChannelState { get; } | |
private HashSet<IConnectableClient> _clientList; | |
private bool _isDisposed; | |
public ChannelConnetor() | |
{ | |
this._channelState = new ReactiveProperty<ChannelState>(); | |
this.ChannelState = this._channelState.ObserveOnMainThread().ToReadOnlyReactiveProperty(); | |
this._clientList = new HashSet<IConnectableClient>(); | |
} | |
public async UniTask ConnectAsync() | |
{ | |
var options = new[] | |
{ | |
new global::Grpc.Core.ChannelOption("grpc.keepalive_permit_without_calls", 1), | |
}; | |
this._channel = new global::Grpc.Core.Channel("localhost", 30507, ChannelCredentials.Insecure, options); | |
this.ConnectLoopAsync().Forget(); | |
await this._channel.ConnectAsync().ConfigureAwait(false); | |
} | |
private async UniTask ConnectLoopAsync() | |
{ | |
while (this._channel.State != global::Grpc.Core.ChannelState.Shutdown) | |
{ | |
var state = this._channel.State; | |
if (await this._channel.TryWaitForStateChangedAsync(state).ConfigureAwait(false)) | |
{ | |
switch (this._channel.State) | |
{ | |
case global::Grpc.Core.ChannelState.Connecting: | |
case global::Grpc.Core.ChannelState.Ready: | |
this._channelState.Value = this._channel.State; | |
break; | |
case global::Grpc.Core.ChannelState.TransientFailure: | |
case global::Grpc.Core.ChannelState.Idle: | |
this._channelState.Value = this._channel.State; | |
goto Retry; | |
default: | |
return; | |
} | |
} | |
continue; | |
Retry: | |
await UniTask.WhenAll(this._clientList | |
.Select(x => x.DisconnectHubAsync().ConfigureAwait(PlayerLoopTiming.Update)) | |
.ToArray()); | |
await this._channel.ConnectAsync().ConfigureAwait(false); | |
await UniTask.WhenAll(this._clientList | |
.Select(x => x.ConnectHubAsync(this._channel).ConfigureAwait(PlayerLoopTiming.Update)) | |
.ToArray()); | |
} | |
} | |
public async UniTask BindAsync(IConnectableClient client) | |
{ | |
if (this._clientList.Add(client)) | |
{ | |
await client.ConnectHubAsync(this._channel).ConfigureAwait(PlayerLoopTiming.Update); | |
} | |
} | |
public async UniTask UnbindAsync(IConnectableClient client) | |
{ | |
if (this._clientList.Remove(client)) | |
{ | |
await client.DisconnectHubAsync().ConfigureAwait(PlayerLoopTiming.Update); | |
} | |
} | |
public async UniTask DisposeAsync() | |
{ | |
if (this._isDisposed) | |
{ | |
return; | |
} | |
this._isDisposed = true; | |
await this._channel.ShutdownAsync().ConfigureAwait(false); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment