Skip to content

Instantly share code, notes, and snippets.

@anaisbetts
Created November 23, 2011 01:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anaisbetts/2877c8181d5ae4c8ac02 to your computer and use it in GitHub Desktop.
Save anaisbetts/2877c8181d5ae4c8ac02 to your computer and use it in GitHub Desktop.
abstract class KeyedOperation
{
public string Key { get; set; }
public int Id { get; set; }
public abstract void EvaluateFunc();
}
class KeyedOperation<T> : KeyedOperation
{
public Func<T> Func { get; set; }
public T Value { get; protected set; }
public readonly AsyncSubject<T> Result = new AsyncSubject<T>();
public override void EvaluateFunc()
{
try
{
Value = Func();
Result.OnNext(Value);
Result.OnCompleted();
}
catch(Exception ex)
{
Result.OnError(ex);
}
}
}
public abstract class KeyedOperationQueue : IEnableLogger
{
static int sequenceNumber = 1;
readonly Subject<KeyedOperation> queuedOps = new Subject<KeyedOperation>();
readonly IConnectableObservable<KeyedOperation> resultObs;
protected KeyedOperationQueue()
{
resultObs = queuedOps
.GroupBy(x => x.Key)
.Select(x => x.Select(ProcessOperation).Concat())
.Merge()
.Multicast(new Subject<KeyedOperation>());
resultObs.Connect();
}
/// <summary>
/// Queue an operation to run in the background. All operations with the same key will run in sequence,
/// waiting for the previous operation to complete.
/// </summary>
/// <param name = "key"></param>
/// <param name = "action"></param>
public IObservable<Unit> EnqueueOperation(string key, Action action)
{
return EnqueueOperation(key, () => { action(); return Unit.Default; });
}
public IObservable<T> EnqueueOperation<T>(string key, Func<T> calculationFunc)
{
int id = Interlocked.Increment(ref sequenceNumber);
key = key ?? "__NONE__";
this.Log().InfoFormat("Queuing item {0} with key {1}", id, key);
var item = new KeyedOperation<T>() {Key = key, Func = calculationFunc, Id = id};
queuedOps.OnNext(item);
return item.Result;
}
static IObservable<KeyedOperation> ProcessOperation(KeyedOperation operation)
{
return Observable.Defer(() => Observable.Start(operation.EvaluateFunc, RxApp.TaskpoolScheduler))
.Select(_ => operation);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment