Created March 1, 2013 18:51
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using UnityEngine;
namespace Asynko
#region Interfaces
public interface IAsync
public interface IReturnResultAsync : IAsync
Exception Exception { get; }
object Data { get; }
T GetResult<T>();
public interface ICallAsync : IAsync
public interface IJoinAsync : IAsync
Async.Task[] Results { get; set; }
public interface IAsyncWithResult<T> : ICallAsync
T Result { get; set; }
public interface ILockAsync : IAsync, IDisposable
object LockIdentifier { get; }
public static class Async
internal static float GetTimeMeasure()
return Time.realtimeSinceStartup;
internal static void WriteLine(string format, params object[] args)
//Debug.Log(string.Format(format, args));
#region API
public static IAsyncWithResult<T> Call<T>(this IEnumerable<IAsync> routine)
return new CallAsync<T>(routine);
public static IJoinAsync Join(params IEnumerable<IAsync>[] routines)
return new JoinAsync(routines.Select(routine => Scheduler.Current.Exec(routine)).ToArray());
public static IEnumerable<IAsync> Wrap(this IJoinAsync join)
yield return join;
public static IAsync Result<T>(T value)
return new ReturnResultAsync<T>(value);
public static IAsync Void(object tag = null)
return new VoidAsync(tag);
public static ILockAsync Lock(object o)
return new LockAsync(o);
#region Extensions
private static IEnumerable<IAsync> ReadToEndAsync(this Stream stream)
const int chunkSize = 1024;
var ms = new MemoryStream();
var buffer = new byte[chunkSize];
var finished = false;
var ready = true;
AsyncCallback cb = ar =>
var length = stream.EndRead(ar);
if (length > 0)
ms.Write(buffer, 0, length);
ready = true;
finished = true;
ready = false;
while (!finished)
yield return Void(ready);
if (ready)
ready = false;
stream.BeginRead(buffer, 0, chunkSize, cb, null);
yield return Result(ms);
public static IEnumerable<IAsync> ReadBytesToEndAsync(this Stream stream)
var reading = stream.ReadToEndAsync().Call<MemoryStream>();
yield return reading;
yield return Result(reading.Result.GetBuffer());
public static IEnumerable<IAsync> ReadStringToEndAsync(this Stream stream)
var reading = stream.ReadToEndAsync().Call<MemoryStream>();
yield return reading;
var memoryStream = reading.Result;
memoryStream.Seek(0, SeekOrigin.Begin);
var streamReader = new StreamReader(memoryStream);
yield return Result(streamReader.ReadToEnd());
public static IEnumerable<IAsync> WriteAllAsync(this Stream stream, byte[] buffer)
const int chunkSize = 1024;
var finished = false;
var ready = true;
var position = 0;
var size = buffer.Length;
var current = Math.Min(size, chunkSize);
AsyncCallback cb = ar =>
position += current;
current = Math.Min(size - position, chunkSize);
if (position < size)
ready = true;
finished = true;
ready = false;
yield return Void(ready);
if (ready)
ready = false;
stream.BeginWrite(buffer, position, current, cb, null);
public static IEnumerable<IAsync> GetResponceAsync(this WebRequest request, float timeout = 3f)
var finished = false;
WebResponse response = null;
var startTime = GetTimeMeasure();
AsyncCallback cb = ar =>
response = request.EndGetResponse(ar);
finished = true;
request.BeginGetResponse(cb, null);
while (!finished)
yield return Void();
if (GetTimeMeasure() - startTime > timeout)
throw new TimeoutException();
yield return Result(response);
#region Primitives
private class JoinAsync : IJoinAsync
public JoinAsync(params Task[] threads)
Results = threads;
public Task[] Results { get; set; }
private class CallAsync<T> : JoinAsync, ICallAsync, IAsyncWithResult<T>
public CallAsync(IEnumerable<IAsync> callable)
: base(Scheduler.Current.Exec(callable, Scheduler.Current.CurrentTask))
public T Result
var res = Results[0].TaskResult;
if (res.Exception != null)
throw res.Exception;
return (T)res.Data;
set { throw new NotImplementedException(); }
private class ReturnResultAsync<T> : IReturnResultAsync, IAsyncWithResult<T>
public ReturnResultAsync(T data)
Result = data;
public T Result { get; set; }
public Exception Exception
get { return Result as Exception; }
public object Data
get { return Result; }
public T1 GetResult<T1>()
if (Data is T1)
return (T1)Data;
throw new InvalidCastException();
private class VoidAsync : IAsync
public object Tag { get; private set; }
public VoidAsync(object tag)
Tag = tag;
/// <summary> This is really a mix of Lock (object) and LockCommand (thing which we yield) </summary>
/// <remarks> Ev_genus, 21.02.2013. </remarks>
private class LockAsync : ILockAsync
public LockAsync(object lockIdentifier)
if (lockIdentifier == null) throw new ArgumentNullException();
LockIdentifier = lockIdentifier;
Intent = LockIntent.Lock;
public void Dispose()
Intent = LockIntent.Unlock;
public LockIntent Intent { get; private set; }
public object LockIdentifier { get; private set; }
public class Task
public IEnumerator<IAsync> Callable;
public List<CriticalSection> CriticalSections { get; set; }
public bool Finished { get; set; }
public Task[] Dependencies { get; set; }
public Task(IEnumerable<IAsync> callable)
Dependencies = new Task[0]; // NREs suck
CriticalSections = new List<CriticalSection>();
Callable = callable.GetEnumerator();
internal void Step()
// TODO: sleep threads instead of busy-waiting
if (Finished) return;
if (Dependencies.Any(x => !x.Finished)) return;
if (BlockingSection != null) return;
var stackFrame = Callable;
Finished = !stackFrame.MoveNext();
catch (Exception ex)
WriteLine("Exception {0}", ex);
Finished = true;
TaskResult = new ReturnResultAsync<Exception>(ex);
if (!Finished)
if (stackFrame.Current is VoidAsync)
WriteLine("Step returned Void({0})", (stackFrame.Current as VoidAsync).Tag);
// do nothing
else if (stackFrame.Current is JoinAsync)
if(stackFrame.Current is ICallAsync)
WriteLine("Step calling");
WriteLine("Step registering join");
Dependencies = (stackFrame.Current as JoinAsync).Results;
else if (stackFrame.Current is IReturnResultAsync)
WriteLine("Step returning {0}", (stackFrame.Current as IReturnResultAsync).Data);
TaskResult = (stackFrame.Current as IReturnResultAsync);
Finished = true;
WriteLine("Task completed");
else if (stackFrame.Current is LockAsync)
var lo = stackFrame.Current as LockAsync;
if (lo.Intent == LockIntent.Lock)
throw new InvalidOperationException();
throw new InvalidOperationException();
WriteLine("Step returning nothing");
Finished = true;
TaskResult = new ReturnResultAsync<object>(null);
WriteLine("Task completed");
public void TryAcquireLock(ILockAsync lo)
Async.CriticalSection cs = Scheduler.Current.FindCriticalSection(lo.LockIdentifier);
if (cs.OwnershipCount == 0)
// 1. we are first to acquire
cs.OwnershipCount = 1;
cs.OwningTask = this;
cs.WaitingTasks = new List<Task>();
if (CurrentThreadsOwnsSection(cs))
// 2. acquire again (increment)
// 3. blocked on acquiring
BlockingSection = cs;
private bool CurrentThreadsOwnsSection(Async.CriticalSection cs)
var th = this;
while (true)
if (th.CriticalSections.Contains(cs)) return true;
if (th.ParentTask == null)
return false;
th = th.ParentTask;
public void TryReleaseLock(ILockAsync lo)
var cs = Scheduler.Current.FindCriticalSection(lo.LockIdentifier);
if (cs.OwnershipCount == 0)
throw new InvalidOperationException();
if (!CurrentThreadsOwnsSection(cs))
throw new InvalidOperationException();
if (cs.OwnershipCount > 1)
cs.OwnershipCount = 0;
cs.OwningTask = null;
// should we do something with waiting threads such as unblock one?
if (cs.WaitingTasks.Count > 0)
var nextOwner = cs.WaitingTasks.FirstOrDefault(thread => thread.BlockingSection == cs);
if (nextOwner == null)
throw new InvalidOperationException();
nextOwner.BlockingSection = null;
cs.OwnershipCount = 1;
cs.OwningTask = nextOwner;
public IReturnResultAsync TaskResult { get; set; }
public CriticalSection BlockingSection { get; set; }
public Task ParentTask { get; set; }
public class CriticalSection
public Task OwningTask { get; set; }
public List<Task> WaitingTasks { get; set; }
public int OwnershipCount { get; set; }
public class Scheduler
static Scheduler()
Current = new Scheduler();
private Scheduler()
Tasks = new List<Task>();
Sections = new Dictionary<object, CriticalSection>();
private Dictionary<object, CriticalSection> Sections { get; set; }
public CriticalSection FindCriticalSection(object identifier)
if (!Current.Sections.ContainsKey(identifier))
Current.Sections[identifier] = new CriticalSection();
return Current.Sections[identifier];
private List<Task> Tasks { get; set; }
private bool _executing;
public Task Exec(IEnumerable<IAsync> a, Task parent = null)
var mt = new Task(a) { ParentTask = parent };
return mt;
public Task CurrentTask { get; private set; }
public void Run()
while (Tasks.Count > 0)
public void Step()
if (_executing)
throw new InvalidOperationException();
_executing = true;
foreach (var task in Tasks.ToArray())
CurrentTask = task;
if (task.Finished) Tasks.Remove(task);
_executing = false;
public static Scheduler Current { get; private set; }
public enum LockIntent
