Skip to content

Instantly share code, notes, and snippets.

@yKimisaki
Last active June 18, 2019 15:59
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save yKimisaki/18c8cd28148936f1e9dad56c7d8396e5 to your computer and use it in GitHub Desktop.
Save yKimisaki/18c8cd28148936f1e9dad56c7d8396e5 to your computer and use it in GitHub Desktop.
using System;
using System.Collections.Generic;
using System.Linq;
using UniRx;
namespace Tonari.UniRx
{
public class ContinuousObservableQueue<T>
{
private Subject<Unit> _onAgitate;
private Subject<Unit> _onSettle;
private readonly Queue<QueueTask<T>> _queue;
public ContinuousObservableQueue()
{
this._queue = new Queue<QueueTask<T>>();
this._onAgitate = new Subject<Unit>();
this._onSettle = new Subject<Unit>();
}
/// <summary>
/// キューがない状態から、新たにキューが開始された時に呼ばれます。
/// </summary>
public IObservable<Unit> OnAgitateAsObservable()
{
return this._onAgitate;
}
/// <summary>
/// 現在積まれているキューをすべて終えた時に呼ばれます。
/// </summary>
public IObservable<Unit> OnSettleAsObservable()
{
return this._onSettle;
}
public IObservable<T> ReserveAsync(Func<IObservable<T>> action)
{
// キューがあるなら待ち状態
if (this._queue.Any())
{
var queueTask = new QueueTask<T>(action);
this._queue.Enqueue(queueTask);
return queueTask.GetResultAsync();
}
// 外部処理
this._onAgitate.OnNext(Unit.Default);
// 待ち状態でないときは初期化
var task = new QueueTask<T>(action);
this._queue.Enqueue(task);
// そのまま実行して、終わり次第次を呼ぶ
this.Step();
return task.GetResultAsync();
}
private void Step()
{
// キューにものがあったら
if (this._queue.Any())
{
// キューから引っ張り出してきて実行
var process = this._queue.Peek();
process.ExecuteAsync()
.Finally(() =>
{
// 終わったらキューから抜いて次のやつを実行するぞい
this._queue.Dequeue();
this.Step();
})
.Subscribe();
return;
}
// 残ってなかったら終了
this._queue.Clear();
// 外部処理
this._onSettle.OnNext(Unit.Default);
}
private class QueueTask<TItem>
{
private Func<IObservable<TItem>> _task;
private AsyncSubject<TItem> _result;
private bool _canceled;
public QueueTask(Func<IObservable<TItem>> task)
{
_task = task;
_result = new AsyncSubject<TItem>();
}
public IObservable<Unit> ExecuteAsync()
{
// キャンセルされていたら何もしないで次に進む
if (_canceled)
{
return Observable.ReturnUnit();
}
else
{
return _task()
.Do(x =>
{
_result.OnNext(x);
_result.OnCompleted();
})
.AsUnitObservable();
}
}
public IObservable<TItem> GetResultAsync()
{
if (_result.IsCompleted) return _result;
return _result.DoOnCancel(() => _canceled = true);
}
}
}
}
@yKimisaki
Copy link
Author

ぶっちゃけCoroutine使いましょう、という感じではあります

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment