Skip to content

Instantly share code, notes, and snippets.

@5alamander
Last active February 15, 2017 14:12
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save 5alamander/d06df06f008f918e236ee0b9c53f328d to your computer and use it in GitHub Desktop.
Save 5alamander/d06df06f008f918e236ee0b9c53f328d to your computer and use it in GitHub Desktop.
初版csp_for_unity。TODO: 1,处理异常时,在unity线程和系统线程处理方式不一致,2.系统线程中无法使用 timeout,3.外部无法手动停止系统线程中的某个coroutine,4.只使用了1个外部线程。更新:使用linkedlist替代queue,闲置时挂起线程。
using System;
using System.Collections;
using System.Collections.Generic;
using UnityEngine;
using System.Threading;
namespace Sa1 {
/// <summary>
/// ins: do not use timeout in the csp.subthread
/// </summary>
public class csp : MonoBehaviour {
public class CSPException : Exception {
public CSPException(string str) : base("[Sa1.csp] " + str) { }
}
public class ChannelCloseException : CSPException {
public ChannelCloseException(string fstr) : base("Channel is closed: " + fstr) { }
}
public class CoroutineExitException : CSPException {
public CoroutineExitException() : base("Coroutine is sub-thread is closed") { }
}
private static bool _initialized = false;
private static void initialize () {
if (!_initialized && Application.isPlaying) {
_initialized = true;
var g = new GameObject("[Sa1.csp]");
_instance = g.AddComponent<csp>();
}
}
private static csp _instance;
public static csp Instance {
get {
initialize();
return _instance;
}
}
private int _mainThreadId;
private readonly Queue _queuedUnityFunctor = Queue.Synchronized(new Queue());
private bool _isRunning;
private int _runningCount;
private readonly Queue _queuedCoroutines = new Queue();
private readonly LinkedList<IEnumerator> _runningCoroutinesList = new LinkedList<IEnumerator>();
private void Awake () {
_isRunning = true;
_mainThreadId = Thread.CurrentThread.ManagedThreadId;
DontDestroyOnLoad(gameObject);
}
private void Start () {
go(_ => {
try {
excuteCoroutines();
}
catch (Exception e) {
Debug.LogError("[csp.SubThread]" + e);
}
});
}
private void Update () {
// do the functor on the delay queue, from sub to unity
while (_queuedUnityFunctor.Count > 0) {
var data = (BackToUnityData) _queuedUnityFunctor.Dequeue();
go(data.behaviour, data.functor);
}
}
private void OnApplicationQuit () {
_isRunning = false;
}
private void addToSubCoroutineList (IEnumerator enumerator) {
lock (_queuedCoroutines) {
_queuedCoroutines.Enqueue(enumerator);
}
}
private void excuteCoroutines () {
while (_isRunning) {
_runningCount = _runningCoroutinesList.Count; // show in inspector
if (_runningCount > 0 || _queuedCoroutines.Count > 0) {
// adding -> running
lock (_queuedCoroutines) {
foreach (var functor in _queuedCoroutines) {
_runningCoroutinesList.AddLast(new LinkedListNode<IEnumerator>((IEnumerator) functor));
}
_queuedCoroutines.Clear();
}
for (var currentNode = _runningCoroutinesList.First;
currentNode != null;
currentNode = currentNode.Next) {
try {
IEnumerator functor = currentNode.Value;
if (functor.MoveNext()) {
var func = functor.Current as Func<MonoBehaviour>;
if (func != null) {
// if yield return csp.toUnity(behaviour), set to unity
var theBehaviour = func();
Instance._queuedUnityFunctor.Enqueue(
new BackToUnityData(theBehaviour ? theBehaviour : Instance, functor));
_runningCoroutinesList.Remove(currentNode);
// continue; // skip next tick
}
// _nextTempCoroutines.Enqueue(functor); // not skip next tick
}
else {
_runningCoroutinesList.Remove(currentNode);
}
}
catch (Exception e) {
Debug.LogError(e);
Debug.LogError(new CoroutineExitException());
}
}
}
else {
Thread.Sleep(1); // suspend this thread, when this thread is idle
}
}
}
public static bool isInMainThread {
get { return Instance._mainThreadId == Thread.CurrentThread.ManagedThreadId; }
}
private static readonly object _toSub = new object();
public static object toSub () {
return _toSub;
}
public static Func<MonoBehaviour> toUnity (MonoBehaviour behaviour = null) {
return () => behaviour;
}
private struct BackToUnityData {
public readonly MonoBehaviour behaviour;
public readonly IEnumerator functor;
public BackToUnityData (MonoBehaviour theBehaviour, IEnumerator thefunctor) {
behaviour = theBehaviour;
functor = thefunctor;
}
}
/// <summary>
/// the take action and alt action result wrapper
/// </summary>
public class Result {
private object _value;
public object value {
get {
if (exception != null)
throw exception;
return _value;
}
set { _value = value; }
}
public bool isTimeout { get; set; }
public bool closed { get; set; } // channel closed
public CSPException exception;
public void copy (Result other) {
_value = other._value;
isTimeout = other.isTimeout;
closed = other.closed;
exception = other.exception;
}
}
/// <summary>
/// Chan the specified n as the default buffer length,
/// the default buffer size is 0
/// </summary>
public static Channel chan (int n = 0) {
return chan(n, Channel.BufferType.Fixed);
}
/// <summary>
/// Chan the specified n and bufferType. Type maybe changed at any time
/// </summary>
public static Channel chan (int n, Channel.BufferType bufferType) {
return new Channel(n, bufferType);
}
public static void go (Action<object> action, object state = null) {
ThreadPool.QueueUserWorkItem(new WaitCallback(action), state);
}
public static void go (IEnumerator functor) {
initialize();
Instance.addToSubCoroutineList(new SubCoroutineWrapper(functor));
}
/// <summary>
/// set to the global event loop and return the running channel
/// </summary>
public static Coroutine go (MonoBehaviour theBehaviour, IEnumerator functor) {
initialize();
if (isInMainThread) {
// startCoroutine to run this functor
return theBehaviour.StartCoroutine(startCspGoroutine(functor));
}
else {
Instance._queuedUnityFunctor.Enqueue(new BackToUnityData(theBehaviour, functor));
return null;
}
}
private static IEnumerator startCspGoroutine (IEnumerator functor) {
while (functor.MoveNext()) {
if (functor.Current == _toSub) {
go(functor);
yield break;
}
if (functor.Current is Yield) {
var y = (Yield) functor.Current;
while (true) {
try {
if (!y.MoveNext()) {
break; // break this while in Yield
}
}
catch (Exception e) {
Debug.LogError(e);
yield break; // kill this coroutine
}
yield return y.Current;
}
}
else {
yield return functor.Current;
}
}
}
/// <summary>
/// Take from the specified chan.
/// </summary>
public static Yield take (out Result result, Channel channel) {
return new Channel.TakeYield(out result, channel);
}
/// <summary>
/// Put value to the channel
/// </summary>
public static Yield put (out Result result, Channel channel, object value) {
return new Channel.PutYield(out result, channel, value);
}
public static Yield put (Channel channel, object value) {
Result ret;
return new Channel.PutYield(out ret, channel, value);
}
public static Yield timeout (float time) {
return new Channel.TimeoutYield(time);
}
public static Yield alts (out Result result, params Yield[] ienumerators) {
return new Channel.AltsYield(out result, ienumerators);
}
public static Yield altsInTime (out Result result, float time, params Yield[] ienumerators) {
var list = new List<Yield>(ienumerators) {timeout(time)};
return new Channel.AltsYield(out result, list);
}
/// <summary>
/// Asyncs the take.
/// </summary>
/// <param name="obj">Object.</param>
/// <param name="channel">Chan.</param>
/// <param name="cb">Cb.</param>
public static void takeAsync (MonoBehaviour obj, Channel channel, Action<Result> cb = null) {
go(obj, takeAsyncFunction(channel, cb));
}
private static IEnumerator takeAsyncFunction (Channel channel, Action<Result> cb) {
Result ret;
yield return take(out ret, channel);
if (cb != null) cb(ret);
}
/// <summary>
/// Asyncs the put.
/// </summary>
/// <param name="obj">Object.</param>
/// <param name="channel">Chan.</param>
/// <param name="value">Value.</param>
/// <param name="cb">Cb.</param>
public static void putAsync (MonoBehaviour obj, Channel channel, object value, Action<Result> cb = null) {
go(obj, putAsyncFunctor(channel, value, cb));
}
private static IEnumerator putAsyncFunctor (Channel channel, object value, Action<Result> cb) {
Result ret;
yield return put(out ret, channel, value);
if (cb != null) cb(ret);
}
public static bool offer (Channel channel, object value) {
return channel.offerValue(value);
}
public static bool poll (Channel channel, out object outValue) {
return channel.pollValue(out outValue);
}
/// <summary>
/// Yield. instruction for channel to use
/// </summary>
public class Yield : IEnumerator {
public Result yieldResult { get; private set; }
protected Channel _channel;
public Yield (Channel chan) {
_channel = chan;
yieldResult = new Result();
}
public Yield (out Result result, Channel chan) {
_channel = chan;
yieldResult = result = new Result();
}
public object Current { get; private set; }
public virtual bool MoveNext () {
return false; // default end the IEnumerator
}
public void yieldResultApply () {
Current = yieldResult.value;
}
public void Reset () {}
}
public class Channel : Yield {
public enum BufferType {
Fixed, Dropping, Sliding
}
//public int dirtyGet { get; private set; }
//public int dirtyPut { get; private set; }
public readonly bool buffered;
public readonly BufferType bufferType;
private readonly Queue _buffer;
private readonly int _maxBufferSize;
#region lock these data
private readonly object thisLock = new object();
public object fixedValue { get; private set; }
public Yield thePutYield { get; private set; }
public bool closed { get; private set; }
#endregion
public Channel (int n, BufferType type) : base(null) {
closed = false;
_maxBufferSize = n;
bufferType = type;
if (n > 0) {
buffered = true;
_buffer = Queue.Synchronized(new Queue(n)); // thread safe
}
}
public override bool MoveNext () {
try {
object value;
var hasNext = !pollValue(out value);
yieldResult.value = value;
yieldResultApply();
return hasNext;
}
catch (ChannelCloseException e) {
yieldResult.closed = true;
yieldResult.exception = e;
}
yieldResultApply();
return false;
}
public bool isEmpty {
get { return _buffer.Count == 0; }
}
public bool isfull {
get { return _buffer.Count >= _maxBufferSize; }
}
public void close () {
lock (thisLock) {
closed = true;
}
}
private bool tryOfferValue (object value) {
if (closed)
throw new ChannelCloseException("tryOfferValue");
if (buffered) {
if (isfull) {
return false; // block
}
_buffer.Enqueue(value);
return true;
}
if (fixedValue != null) {
return false; // block
}
lock (thisLock) {
fixedValue = value;
}
return true;
}
public bool offerValue (object value) {
if (closed)
throw new ChannelCloseException("offerValue");
var tryOffered = tryOfferValue(value);
if (tryOffered) return true;
if (bufferType == BufferType.Sliding) {
if (buffered) {
_buffer.Dequeue();
_buffer.Enqueue(value);
}
else {
lock (thisLock) {
fixedValue = value;
}
}
return true;
}
// else if Dropping, fixed
return false;
}
public bool pollValue (out object outValue) {
bool ret;
outValue = null;
if (buffered) {
if (isEmpty) {
ret = false; // no value
}
else {
outValue = _buffer.Dequeue();
ret = true;
}
}
else {
if (fixedValue == null) {
ret = false; // no value
}
else {
lock (thisLock) {
outValue = fixedValue;
fixedValue = null;
thePutYield = null;
}
ret = true;
}
}
if (!ret && closed) {
throw new ChannelCloseException("pollValue");
}
return ret;
}
public void setFixedValue (object value, PutYield putYield) {
lock (thisLock) {
fixedValue = value;
thePutYield = putYield;
}
}
// *** the yields ***
public class TakeYield : Yield {
public TakeYield (out Result result, Channel chan)
: base(out result, chan) {}
public override bool MoveNext () {
object takeResult;
bool polled;
try {
polled = _channel.pollValue(out takeResult);
}
catch (ChannelCloseException e) {
yieldResult.closed = true;
yieldResult.exception = e;
yieldResultApply();
return false;
}
yieldResult.value = takeResult;
yieldResultApply();
return !polled;
}
}
public class PutYield : Yield {
private readonly object _value;
private bool _isWaitingTake;
public PutYield (out Result result, Channel chan, object value)
: base(out result, chan) {
_value = value;
}
public override bool MoveNext () {
if (_channel.closed) {
yieldResult.closed = true;
yieldResult.value = null;
yieldResult.exception = new ChannelCloseException("PutYield.MoveNext");
yieldResultApply();
return false;
}
if (!_channel.buffered) {
if (_isWaitingTake) {
return _channel.thePutYield == this; // false - no block
}
_channel.setFixedValue(_value, this);
_isWaitingTake = true; // to wait this to be took
return true;
}
// else _channel is buffered
var offered = _channel.offerValue(_value);
if (offered) {
yieldResult.value = _value;
yieldResultApply();
return false; // no block
}
if (_channel.bufferType == BufferType.Dropping) {
return false;
}
return true; // block
}
}
public class TimeoutYield : Yield {
private readonly float _startTime;
private readonly float _lifeTime;
public TimeoutYield (float time) : base(null) {
if (isInMainThread) {
_startTime = Time.time;
_lifeTime = time;
}
else {
Debug.LogError("timeout only work in unity");
}
}
public override bool MoveNext () {
if (Time.time - _startTime < _lifeTime) {
return true;
}
yieldResult.isTimeout = true;
return false;
}
}
/// <summary>
/// Alts yield. normally used with a timeout yield
/// </summary>
public class AltsYield : Yield {
private readonly IList<Yield> _ienumerators;
public AltsYield (out Result result, IList<Yield> ienumerators)
: base(out result, null) {
_ienumerators = ienumerators;
}
public override bool MoveNext () {
//csp.ret = null; // TODO check if the goroutine is return immediat
foreach (var cspYeild in _ienumerators) {
var hasNext = cspYeild.MoveNext();
if (!hasNext) {
yieldResult.copy(cspYeild.yieldResult);
yieldResult.value = cspYeild.Current;
base.MoveNext();
return false;
}
}
base.MoveNext();
return true;
}
}
}
}
/// <summary>
/// hold a call stack to apply nested IEnumerator
/// </summary>
internal class SubCoroutineWrapper : IEnumerator {
private readonly Stack<IEnumerator> _functorStack = new Stack<IEnumerator>();
public SubCoroutineWrapper (IEnumerator functor) {
_functorStack.Push(functor);
}
public bool MoveNext () {
if (_functorStack.Count == 0) {
return false;
}
var currentEnumerator = _functorStack.Peek();
if (currentEnumerator.MoveNext()) {
// if there is a sub ienumerator
var ie = currentEnumerator.Current as IEnumerator;
if (ie != null) {
_functorStack.Push(ie);
return MoveNext();
}
Current = currentEnumerator.Current;
return true;
}
// move next false
_functorStack.Pop();
return MoveNext();
}
public void Reset () {}
public object Current { get; private set; }
}
}
public IEnumerator usage () {
yield return csp.toSub(); // 跳出unity线程
// do something ...
yield return csp.toUnity(); // 跳回unity线程
// yield return csp.toUnity(this); // 跳回unity线程时指定某个monobehaviour来运行
var ch = csp.chan(); // 默认阻塞的管道
var ch0 = csp.chan(2); // 缓存为2的管道
var ch1 = csp.chan(3, csp.Channel.BufferType.Sliding); // 管道选项
csp.Result ret; // 由于c# 3.5 没有 async 和 await,此处用一个变量当做yield的返回值
yield return csp.put(ch0, 1); // 放入管道, 如果被阻塞则等待
yield return csp.put(out ret, ch1, "asdf"); // 放入管道, 获得返回值,可以看看异常之类的
csp.putAsync(this, ch0, 1, _ => { Debug.Log("put succeed"); }); // 放入管道,不等待
yield return csp.take(out ret, ch0); // 从管道取出一个值
Debug.Log("take the result" + ret.value);
csp.takeAsync(this, ch1, result => {
Debug.Log("take async: " + result.value);
});
yield return csp.timeout(1f); // 等待1秒, 【注:timeout只能在unity线程中使用】
yield return csp.alts(out ret, csp.timeout(1f), ch, ch0, ch1);
// 从3个管道中取出一个值,或者1秒后没有值则超时
if (ret.isTimeout) {
Debug.Log("timeout");
}
else {
Debug.Log("value" + ret.value);
}
yield return csp.altsInTime(out ret, 1f, ch, ch0, ch1); // 效果同上, 少写一个timeout
}
// 调用
csp.go(this, usage()); // this处需要一个monobehaviour来运行初始coroutine,
//csp.go(usage()); // 或者不提供monobehaviour参数,则初始就在系统线程中
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment