Created
March 1, 2013 18:51
-
-
Save Evgenus/5066851 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.IO; | |
using System.Linq; | |
using System.Net; | |
using UnityEngine; | |
namespace Asynko | |
{ | |
#region Interfaces | |
public interface IAsync | |
{ | |
} | |
public interface IReturnResultAsync : IAsync | |
{ | |
Exception Exception { get; } | |
object Data { get; } | |
T GetResult<T>(); | |
} | |
public interface ICallAsync : IAsync | |
{ | |
} | |
public interface IJoinAsync : IAsync | |
{ | |
Async.Task[] Results { get; set; } | |
} | |
public interface IAsyncWithResult<T> : ICallAsync | |
{ | |
T Result { get; set; } | |
} | |
public interface ILockAsync : IAsync, IDisposable | |
{ | |
object LockIdentifier { get; } | |
} | |
#endregion | |
public static class Async | |
{ | |
internal static float GetTimeMeasure() | |
{ | |
return Time.realtimeSinceStartup; | |
} | |
internal static void WriteLine(string format, params object[] args) | |
{ | |
//Debug.Log(string.Format(format, args)); | |
} | |
#region API | |
public static IAsyncWithResult<T> Call<T>(this IEnumerable<IAsync> routine) | |
{ | |
return new CallAsync<T>(routine); | |
} | |
public static IJoinAsync Join(params IEnumerable<IAsync>[] routines) | |
{ | |
return new JoinAsync(routines.Select(routine => Scheduler.Current.Exec(routine)).ToArray()); | |
} | |
public static IEnumerable<IAsync> Wrap(this IJoinAsync join) | |
{ | |
yield return join; | |
} | |
public static IAsync Result<T>(T value) | |
{ | |
return new ReturnResultAsync<T>(value); | |
} | |
public static IAsync Void(object tag = null) | |
{ | |
return new VoidAsync(tag); | |
} | |
public static ILockAsync Lock(object o) | |
{ | |
return new LockAsync(o); | |
} | |
#endregion | |
#region Extensions | |
private static IEnumerable<IAsync> ReadToEndAsync(this Stream stream) | |
{ | |
const int chunkSize = 1024; | |
var ms = new MemoryStream(); | |
var buffer = new byte[chunkSize]; | |
var finished = false; | |
var ready = true; | |
AsyncCallback cb = ar => | |
{ | |
var length = stream.EndRead(ar); | |
if (length > 0) | |
{ | |
ms.Write(buffer, 0, length); | |
ready = true; | |
} | |
else | |
{ | |
finished = true; | |
ready = false; | |
} | |
}; | |
while (!finished) | |
{ | |
yield return Void(ready); | |
if (ready) | |
{ | |
ready = false; | |
stream.BeginRead(buffer, 0, chunkSize, cb, null); | |
} | |
} | |
yield return Result(ms); | |
} | |
public static IEnumerable<IAsync> ReadBytesToEndAsync(this Stream stream) | |
{ | |
var reading = stream.ReadToEndAsync().Call<MemoryStream>(); | |
yield return reading; | |
yield return Result(reading.Result.GetBuffer()); | |
} | |
public static IEnumerable<IAsync> ReadStringToEndAsync(this Stream stream) | |
{ | |
var reading = stream.ReadToEndAsync().Call<MemoryStream>(); | |
yield return reading; | |
var memoryStream = reading.Result; | |
memoryStream.Seek(0, SeekOrigin.Begin); | |
var streamReader = new StreamReader(memoryStream); | |
yield return Result(streamReader.ReadToEnd()); | |
} | |
public static IEnumerable<IAsync> WriteAllAsync(this Stream stream, byte[] buffer) | |
{ | |
const int chunkSize = 1024; | |
var finished = false; | |
var ready = true; | |
var position = 0; | |
var size = buffer.Length; | |
var current = Math.Min(size, chunkSize); | |
AsyncCallback cb = ar => | |
{ | |
stream.EndWrite(ar); | |
position += current; | |
current = Math.Min(size - position, chunkSize); | |
if (position < size) | |
{ | |
ready = true; | |
} | |
else | |
{ | |
finished = true; | |
ready = false; | |
} | |
}; | |
while(!finished) | |
{ | |
yield return Void(ready); | |
if (ready) | |
{ | |
ready = false; | |
stream.BeginWrite(buffer, position, current, cb, null); | |
} | |
} | |
} | |
public static IEnumerable<IAsync> GetResponceAsync(this WebRequest request, float timeout = 3f) | |
{ | |
var finished = false; | |
WebResponse response = null; | |
var startTime = GetTimeMeasure(); | |
AsyncCallback cb = ar => | |
{ | |
response = request.EndGetResponse(ar); | |
finished = true; | |
}; | |
request.BeginGetResponse(cb, null); | |
while (!finished) | |
{ | |
yield return Void(); | |
if (GetTimeMeasure() - startTime > timeout) | |
throw new TimeoutException(); | |
} | |
yield return Result(response); | |
} | |
#endregion | |
#region Primitives | |
private class JoinAsync : IJoinAsync | |
{ | |
public JoinAsync(params Task[] threads) | |
{ | |
Results = threads; | |
} | |
public Task[] Results { get; set; } | |
} | |
private class CallAsync<T> : JoinAsync, ICallAsync, IAsyncWithResult<T> | |
{ | |
public CallAsync(IEnumerable<IAsync> callable) | |
: base(Scheduler.Current.Exec(callable, Scheduler.Current.CurrentTask)) | |
{ | |
} | |
public T Result | |
{ | |
get | |
{ | |
var res = Results[0].TaskResult; | |
if (res.Exception != null) | |
throw res.Exception; | |
return (T)res.Data; | |
} | |
set { throw new NotImplementedException(); } | |
} | |
} | |
private class ReturnResultAsync<T> : IReturnResultAsync, IAsyncWithResult<T> | |
{ | |
public ReturnResultAsync(T data) | |
{ | |
Result = data; | |
} | |
public T Result { get; set; } | |
public Exception Exception | |
{ | |
get { return Result as Exception; } | |
} | |
public object Data | |
{ | |
get { return Result; } | |
} | |
public T1 GetResult<T1>() | |
{ | |
if (Data is T1) | |
{ | |
return (T1)Data; | |
} | |
else | |
{ | |
throw new InvalidCastException(); | |
} | |
} | |
} | |
private class VoidAsync : IAsync | |
{ | |
public object Tag { get; private set; } | |
public VoidAsync(object tag) | |
{ | |
Tag = tag; | |
} | |
} | |
/// <summary> This is really a mix of Lock (object) and LockCommand (thing which we yield) </summary> | |
/// <remarks> Ev_genus, 21.02.2013. </remarks> | |
private class LockAsync : ILockAsync | |
{ | |
public LockAsync(object lockIdentifier) | |
{ | |
if (lockIdentifier == null) throw new ArgumentNullException(); | |
LockIdentifier = lockIdentifier; | |
Intent = LockIntent.Lock; | |
} | |
public void Dispose() | |
{ | |
Intent = LockIntent.Unlock; | |
Scheduler.Current.CurrentTask.TryReleaseLock(this); | |
} | |
public LockIntent Intent { get; private set; } | |
public object LockIdentifier { get; private set; } | |
} | |
#endregion | |
public class Task | |
{ | |
public IEnumerator<IAsync> Callable; | |
public List<CriticalSection> CriticalSections { get; set; } | |
public bool Finished { get; set; } | |
public Task[] Dependencies { get; set; } | |
public Task(IEnumerable<IAsync> callable) | |
{ | |
Dependencies = new Task[0]; // NREs suck | |
CriticalSections = new List<CriticalSection>(); | |
Callable = callable.GetEnumerator(); | |
} | |
internal void Step() | |
{ | |
// TODO: sleep threads instead of busy-waiting | |
if (Finished) return; | |
if (Dependencies.Any(x => !x.Finished)) return; | |
if (BlockingSection != null) return; | |
var stackFrame = Callable; | |
try | |
{ | |
Finished = !stackFrame.MoveNext(); | |
} | |
catch (Exception ex) | |
{ | |
WriteLine("Exception {0}", ex); | |
Finished = true; | |
TaskResult = new ReturnResultAsync<Exception>(ex); | |
return; | |
} | |
if (!Finished) | |
{ | |
if (stackFrame.Current is VoidAsync) | |
{ | |
WriteLine("Step returned Void({0})", (stackFrame.Current as VoidAsync).Tag); | |
// do nothing | |
} | |
else if (stackFrame.Current is JoinAsync) | |
{ | |
if(stackFrame.Current is ICallAsync) | |
{ | |
WriteLine("Step calling"); | |
} | |
else | |
{ | |
WriteLine("Step registering join"); | |
} | |
Dependencies = (stackFrame.Current as JoinAsync).Results; | |
} | |
else if (stackFrame.Current is IReturnResultAsync) | |
{ | |
WriteLine("Step returning {0}", (stackFrame.Current as IReturnResultAsync).Data); | |
TaskResult = (stackFrame.Current as IReturnResultAsync); | |
Finished = true; | |
WriteLine("Task completed"); | |
} | |
else if (stackFrame.Current is LockAsync) | |
{ | |
var lo = stackFrame.Current as LockAsync; | |
if (lo.Intent == LockIntent.Lock) | |
{ | |
TryAcquireLock(lo); | |
} | |
else | |
{ | |
throw new InvalidOperationException(); | |
} | |
} | |
else | |
{ | |
throw new InvalidOperationException(); | |
} | |
} | |
else | |
{ | |
WriteLine("Step returning nothing"); | |
Finished = true; | |
TaskResult = new ReturnResultAsync<object>(null); | |
WriteLine("Task completed"); | |
} | |
} | |
public void TryAcquireLock(ILockAsync lo) | |
{ | |
Async.CriticalSection cs = Scheduler.Current.FindCriticalSection(lo.LockIdentifier); | |
if (cs.OwnershipCount == 0) | |
{ | |
// 1. we are first to acquire | |
cs.OwnershipCount = 1; | |
cs.OwningTask = this; | |
cs.WaitingTasks = new List<Task>(); | |
CriticalSections.Add(cs); | |
} | |
else | |
{ | |
if (CurrentThreadsOwnsSection(cs)) | |
{ | |
// 2. acquire again (increment) | |
cs.OwnershipCount++; | |
} | |
else | |
{ | |
// 3. blocked on acquiring | |
cs.WaitingTasks.Add(this); | |
BlockingSection = cs; | |
} | |
} | |
} | |
private bool CurrentThreadsOwnsSection(Async.CriticalSection cs) | |
{ | |
var th = this; | |
while (true) | |
{ | |
if (th.CriticalSections.Contains(cs)) return true; | |
if (th.ParentTask == null) | |
{ | |
return false; | |
} | |
th = th.ParentTask; | |
} | |
} | |
public void TryReleaseLock(ILockAsync lo) | |
{ | |
var cs = Scheduler.Current.FindCriticalSection(lo.LockIdentifier); | |
if (cs.OwnershipCount == 0) | |
{ | |
throw new InvalidOperationException(); | |
} | |
if (!CurrentThreadsOwnsSection(cs)) | |
{ | |
throw new InvalidOperationException(); | |
} | |
if (cs.OwnershipCount > 1) | |
{ | |
cs.OwnershipCount--; | |
} | |
else | |
{ | |
cs.OwnershipCount = 0; | |
cs.OwningTask = null; | |
// should we do something with waiting threads such as unblock one? | |
if (cs.WaitingTasks.Count > 0) | |
{ | |
var nextOwner = cs.WaitingTasks.FirstOrDefault(thread => thread.BlockingSection == cs); | |
if (nextOwner == null) | |
throw new InvalidOperationException(); | |
nextOwner.BlockingSection = null; | |
cs.WaitingTasks.Remove(nextOwner); | |
cs.OwnershipCount = 1; | |
cs.OwningTask = nextOwner; | |
} | |
else | |
{ | |
} | |
} | |
} | |
public IReturnResultAsync TaskResult { get; set; } | |
public CriticalSection BlockingSection { get; set; } | |
public Task ParentTask { get; set; } | |
} | |
public class CriticalSection | |
{ | |
public Task OwningTask { get; set; } | |
public List<Task> WaitingTasks { get; set; } | |
public int OwnershipCount { get; set; } | |
} | |
public class Scheduler | |
{ | |
static Scheduler() | |
{ | |
Current = new Scheduler(); | |
} | |
private Scheduler() | |
{ | |
Tasks = new List<Task>(); | |
Sections = new Dictionary<object, CriticalSection>(); | |
} | |
private Dictionary<object, CriticalSection> Sections { get; set; } | |
public CriticalSection FindCriticalSection(object identifier) | |
{ | |
if (!Current.Sections.ContainsKey(identifier)) | |
{ | |
Current.Sections[identifier] = new CriticalSection(); | |
} | |
return Current.Sections[identifier]; | |
} | |
private List<Task> Tasks { get; set; } | |
private bool _executing; | |
public Task Exec(IEnumerable<IAsync> a, Task parent = null) | |
{ | |
var mt = new Task(a) { ParentTask = parent }; | |
Tasks.Add(mt); | |
return mt; | |
} | |
public Task CurrentTask { get; private set; } | |
public void Run() | |
{ | |
while (Tasks.Count > 0) | |
{ | |
Step(); | |
} | |
} | |
public void Step() | |
{ | |
if (_executing) | |
{ | |
throw new InvalidOperationException(); | |
} | |
_executing = true; | |
foreach (var task in Tasks.ToArray()) | |
{ | |
CurrentTask = task; | |
task.Step(); | |
if (task.Finished) Tasks.Remove(task); | |
} | |
_executing = false; | |
} | |
public static Scheduler Current { get; private set; } | |
} | |
public enum LockIntent | |
{ | |
Lock, | |
Unlock | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment