Skip to content

Instantly share code, notes, and snippets.

@Evgenus
Created March 1, 2013 18:51
Show Gist options
  • Save Evgenus/5066851 to your computer and use it in GitHub Desktop.
Save Evgenus/5066851 to your computer and use it in GitHub Desktop.
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using UnityEngine;
namespace Asynko
{
#region Interfaces
public interface IAsync
{
}
public interface IReturnResultAsync : IAsync
{
Exception Exception { get; }
object Data { get; }
T GetResult<T>();
}
public interface ICallAsync : IAsync
{
}
public interface IJoinAsync : IAsync
{
Async.Task[] Results { get; set; }
}
public interface IAsyncWithResult<T> : ICallAsync
{
T Result { get; set; }
}
public interface ILockAsync : IAsync, IDisposable
{
object LockIdentifier { get; }
}
#endregion
public static class Async
{
internal static float GetTimeMeasure()
{
return Time.realtimeSinceStartup;
}
internal static void WriteLine(string format, params object[] args)
{
//Debug.Log(string.Format(format, args));
}
#region API
public static IAsyncWithResult<T> Call<T>(this IEnumerable<IAsync> routine)
{
return new CallAsync<T>(routine);
}
public static IJoinAsync Join(params IEnumerable<IAsync>[] routines)
{
return new JoinAsync(routines.Select(routine => Scheduler.Current.Exec(routine)).ToArray());
}
public static IEnumerable<IAsync> Wrap(this IJoinAsync join)
{
yield return join;
}
public static IAsync Result<T>(T value)
{
return new ReturnResultAsync<T>(value);
}
public static IAsync Void(object tag = null)
{
return new VoidAsync(tag);
}
public static ILockAsync Lock(object o)
{
return new LockAsync(o);
}
#endregion
#region Extensions
private static IEnumerable<IAsync> ReadToEndAsync(this Stream stream)
{
const int chunkSize = 1024;
var ms = new MemoryStream();
var buffer = new byte[chunkSize];
var finished = false;
var ready = true;
AsyncCallback cb = ar =>
{
var length = stream.EndRead(ar);
if (length > 0)
{
ms.Write(buffer, 0, length);
ready = true;
}
else
{
finished = true;
ready = false;
}
};
while (!finished)
{
yield return Void(ready);
if (ready)
{
ready = false;
stream.BeginRead(buffer, 0, chunkSize, cb, null);
}
}
yield return Result(ms);
}
public static IEnumerable<IAsync> ReadBytesToEndAsync(this Stream stream)
{
var reading = stream.ReadToEndAsync().Call<MemoryStream>();
yield return reading;
yield return Result(reading.Result.GetBuffer());
}
public static IEnumerable<IAsync> ReadStringToEndAsync(this Stream stream)
{
var reading = stream.ReadToEndAsync().Call<MemoryStream>();
yield return reading;
var memoryStream = reading.Result;
memoryStream.Seek(0, SeekOrigin.Begin);
var streamReader = new StreamReader(memoryStream);
yield return Result(streamReader.ReadToEnd());
}
public static IEnumerable<IAsync> WriteAllAsync(this Stream stream, byte[] buffer)
{
const int chunkSize = 1024;
var finished = false;
var ready = true;
var position = 0;
var size = buffer.Length;
var current = Math.Min(size, chunkSize);
AsyncCallback cb = ar =>
{
stream.EndWrite(ar);
position += current;
current = Math.Min(size - position, chunkSize);
if (position < size)
{
ready = true;
}
else
{
finished = true;
ready = false;
}
};
while(!finished)
{
yield return Void(ready);
if (ready)
{
ready = false;
stream.BeginWrite(buffer, position, current, cb, null);
}
}
}
public static IEnumerable<IAsync> GetResponceAsync(this WebRequest request, float timeout = 3f)
{
var finished = false;
WebResponse response = null;
var startTime = GetTimeMeasure();
AsyncCallback cb = ar =>
{
response = request.EndGetResponse(ar);
finished = true;
};
request.BeginGetResponse(cb, null);
while (!finished)
{
yield return Void();
if (GetTimeMeasure() - startTime > timeout)
throw new TimeoutException();
}
yield return Result(response);
}
#endregion
#region Primitives
private class JoinAsync : IJoinAsync
{
public JoinAsync(params Task[] threads)
{
Results = threads;
}
public Task[] Results { get; set; }
}
private class CallAsync<T> : JoinAsync, ICallAsync, IAsyncWithResult<T>
{
public CallAsync(IEnumerable<IAsync> callable)
: base(Scheduler.Current.Exec(callable, Scheduler.Current.CurrentTask))
{
}
public T Result
{
get
{
var res = Results[0].TaskResult;
if (res.Exception != null)
throw res.Exception;
return (T)res.Data;
}
set { throw new NotImplementedException(); }
}
}
private class ReturnResultAsync<T> : IReturnResultAsync, IAsyncWithResult<T>
{
public ReturnResultAsync(T data)
{
Result = data;
}
public T Result { get; set; }
public Exception Exception
{
get { return Result as Exception; }
}
public object Data
{
get { return Result; }
}
public T1 GetResult<T1>()
{
if (Data is T1)
{
return (T1)Data;
}
else
{
throw new InvalidCastException();
}
}
}
private class VoidAsync : IAsync
{
public object Tag { get; private set; }
public VoidAsync(object tag)
{
Tag = tag;
}
}
/// <summary> This is really a mix of Lock (object) and LockCommand (thing which we yield) </summary>
/// <remarks> Ev_genus, 21.02.2013. </remarks>
private class LockAsync : ILockAsync
{
public LockAsync(object lockIdentifier)
{
if (lockIdentifier == null) throw new ArgumentNullException();
LockIdentifier = lockIdentifier;
Intent = LockIntent.Lock;
}
public void Dispose()
{
Intent = LockIntent.Unlock;
Scheduler.Current.CurrentTask.TryReleaseLock(this);
}
public LockIntent Intent { get; private set; }
public object LockIdentifier { get; private set; }
}
#endregion
public class Task
{
public IEnumerator<IAsync> Callable;
public List<CriticalSection> CriticalSections { get; set; }
public bool Finished { get; set; }
public Task[] Dependencies { get; set; }
public Task(IEnumerable<IAsync> callable)
{
Dependencies = new Task[0]; // NREs suck
CriticalSections = new List<CriticalSection>();
Callable = callable.GetEnumerator();
}
internal void Step()
{
// TODO: sleep threads instead of busy-waiting
if (Finished) return;
if (Dependencies.Any(x => !x.Finished)) return;
if (BlockingSection != null) return;
var stackFrame = Callable;
try
{
Finished = !stackFrame.MoveNext();
}
catch (Exception ex)
{
WriteLine("Exception {0}", ex);
Finished = true;
TaskResult = new ReturnResultAsync<Exception>(ex);
return;
}
if (!Finished)
{
if (stackFrame.Current is VoidAsync)
{
WriteLine("Step returned Void({0})", (stackFrame.Current as VoidAsync).Tag);
// do nothing
}
else if (stackFrame.Current is JoinAsync)
{
if(stackFrame.Current is ICallAsync)
{
WriteLine("Step calling");
}
else
{
WriteLine("Step registering join");
}
Dependencies = (stackFrame.Current as JoinAsync).Results;
}
else if (stackFrame.Current is IReturnResultAsync)
{
WriteLine("Step returning {0}", (stackFrame.Current as IReturnResultAsync).Data);
TaskResult = (stackFrame.Current as IReturnResultAsync);
Finished = true;
WriteLine("Task completed");
}
else if (stackFrame.Current is LockAsync)
{
var lo = stackFrame.Current as LockAsync;
if (lo.Intent == LockIntent.Lock)
{
TryAcquireLock(lo);
}
else
{
throw new InvalidOperationException();
}
}
else
{
throw new InvalidOperationException();
}
}
else
{
WriteLine("Step returning nothing");
Finished = true;
TaskResult = new ReturnResultAsync<object>(null);
WriteLine("Task completed");
}
}
public void TryAcquireLock(ILockAsync lo)
{
Async.CriticalSection cs = Scheduler.Current.FindCriticalSection(lo.LockIdentifier);
if (cs.OwnershipCount == 0)
{
// 1. we are first to acquire
cs.OwnershipCount = 1;
cs.OwningTask = this;
cs.WaitingTasks = new List<Task>();
CriticalSections.Add(cs);
}
else
{
if (CurrentThreadsOwnsSection(cs))
{
// 2. acquire again (increment)
cs.OwnershipCount++;
}
else
{
// 3. blocked on acquiring
cs.WaitingTasks.Add(this);
BlockingSection = cs;
}
}
}
private bool CurrentThreadsOwnsSection(Async.CriticalSection cs)
{
var th = this;
while (true)
{
if (th.CriticalSections.Contains(cs)) return true;
if (th.ParentTask == null)
{
return false;
}
th = th.ParentTask;
}
}
public void TryReleaseLock(ILockAsync lo)
{
var cs = Scheduler.Current.FindCriticalSection(lo.LockIdentifier);
if (cs.OwnershipCount == 0)
{
throw new InvalidOperationException();
}
if (!CurrentThreadsOwnsSection(cs))
{
throw new InvalidOperationException();
}
if (cs.OwnershipCount > 1)
{
cs.OwnershipCount--;
}
else
{
cs.OwnershipCount = 0;
cs.OwningTask = null;
// should we do something with waiting threads such as unblock one?
if (cs.WaitingTasks.Count > 0)
{
var nextOwner = cs.WaitingTasks.FirstOrDefault(thread => thread.BlockingSection == cs);
if (nextOwner == null)
throw new InvalidOperationException();
nextOwner.BlockingSection = null;
cs.WaitingTasks.Remove(nextOwner);
cs.OwnershipCount = 1;
cs.OwningTask = nextOwner;
}
else
{
}
}
}
public IReturnResultAsync TaskResult { get; set; }
public CriticalSection BlockingSection { get; set; }
public Task ParentTask { get; set; }
}
public class CriticalSection
{
public Task OwningTask { get; set; }
public List<Task> WaitingTasks { get; set; }
public int OwnershipCount { get; set; }
}
public class Scheduler
{
static Scheduler()
{
Current = new Scheduler();
}
private Scheduler()
{
Tasks = new List<Task>();
Sections = new Dictionary<object, CriticalSection>();
}
private Dictionary<object, CriticalSection> Sections { get; set; }
public CriticalSection FindCriticalSection(object identifier)
{
if (!Current.Sections.ContainsKey(identifier))
{
Current.Sections[identifier] = new CriticalSection();
}
return Current.Sections[identifier];
}
private List<Task> Tasks { get; set; }
private bool _executing;
public Task Exec(IEnumerable<IAsync> a, Task parent = null)
{
var mt = new Task(a) { ParentTask = parent };
Tasks.Add(mt);
return mt;
}
public Task CurrentTask { get; private set; }
public void Run()
{
while (Tasks.Count > 0)
{
Step();
}
}
public void Step()
{
if (_executing)
{
throw new InvalidOperationException();
}
_executing = true;
foreach (var task in Tasks.ToArray())
{
CurrentTask = task;
task.Step();
if (task.Finished) Tasks.Remove(task);
}
_executing = false;
}
public static Scheduler Current { get; private set; }
}
public enum LockIntent
{
Lock,
Unlock
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment