secret
Created

  • Download Gist
gistfile1.cs
C#
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75
abstract class KeyedOperation
{
public string Key { get; set; }
public int Id { get; set; }
 
public abstract void EvaluateFunc();
}
 
class KeyedOperation<T> : KeyedOperation
{
public Func<T> Func { get; set; }
public T Value { get; protected set; }
public readonly AsyncSubject<T> Result = new AsyncSubject<T>();
 
public override void EvaluateFunc()
{
try
{
Value = Func();
Result.OnNext(Value);
Result.OnCompleted();
}
catch(Exception ex)
{
Result.OnError(ex);
}
}
}
 
public abstract class KeyedOperationQueue : IEnableLogger
{
static int sequenceNumber = 1;
readonly Subject<KeyedOperation> queuedOps = new Subject<KeyedOperation>();
readonly IConnectableObservable<KeyedOperation> resultObs;
 
protected KeyedOperationQueue()
{
resultObs = queuedOps
.GroupBy(x => x.Key)
.Select(x => x.Select(ProcessOperation).Concat())
.Merge()
.Multicast(new Subject<KeyedOperation>());
 
resultObs.Connect();
}
 
/// <summary>
/// Queue an operation to run in the background. All operations with the same key will run in sequence,
/// waiting for the previous operation to complete.
/// </summary>
/// <param name = "key"></param>
/// <param name = "action"></param>
public IObservable<Unit> EnqueueOperation(string key, Action action)
{
return EnqueueOperation(key, () => { action(); return Unit.Default; });
}
 
public IObservable<T> EnqueueOperation<T>(string key, Func<T> calculationFunc)
{
int id = Interlocked.Increment(ref sequenceNumber);
key = key ?? "__NONE__";
 
this.Log().InfoFormat("Queuing item {0} with key {1}", id, key);
var item = new KeyedOperation<T>() {Key = key, Func = calculationFunc, Id = id};
queuedOps.OnNext(item);
 
return item.Result;
}
 
static IObservable<KeyedOperation> ProcessOperation(KeyedOperation operation)
{
return Observable.Defer(() => Observable.Start(operation.EvaluateFunc, RxApp.TaskpoolScheduler))
.Select(_ => operation);
}
}

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.