-
-
Save anaisbetts/2877c8181d5ae4c8ac02 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
abstract class KeyedOperation | |
{ | |
public string Key { get; set; } | |
public int Id { get; set; } | |
public abstract void EvaluateFunc(); | |
} | |
class KeyedOperation<T> : KeyedOperation | |
{ | |
public Func<T> Func { get; set; } | |
public T Value { get; protected set; } | |
public readonly AsyncSubject<T> Result = new AsyncSubject<T>(); | |
public override void EvaluateFunc() | |
{ | |
try | |
{ | |
Value = Func(); | |
Result.OnNext(Value); | |
Result.OnCompleted(); | |
} | |
catch(Exception ex) | |
{ | |
Result.OnError(ex); | |
} | |
} | |
} | |
public abstract class KeyedOperationQueue : IEnableLogger | |
{ | |
static int sequenceNumber = 1; | |
readonly Subject<KeyedOperation> queuedOps = new Subject<KeyedOperation>(); | |
readonly IConnectableObservable<KeyedOperation> resultObs; | |
protected KeyedOperationQueue() | |
{ | |
resultObs = queuedOps | |
.GroupBy(x => x.Key) | |
.Select(x => x.Select(ProcessOperation).Concat()) | |
.Merge() | |
.Multicast(new Subject<KeyedOperation>()); | |
resultObs.Connect(); | |
} | |
/// <summary> | |
/// Queue an operation to run in the background. All operations with the same key will run in sequence, | |
/// waiting for the previous operation to complete. | |
/// </summary> | |
/// <param name = "key"></param> | |
/// <param name = "action"></param> | |
public IObservable<Unit> EnqueueOperation(string key, Action action) | |
{ | |
return EnqueueOperation(key, () => { action(); return Unit.Default; }); | |
} | |
public IObservable<T> EnqueueOperation<T>(string key, Func<T> calculationFunc) | |
{ | |
int id = Interlocked.Increment(ref sequenceNumber); | |
key = key ?? "__NONE__"; | |
this.Log().InfoFormat("Queuing item {0} with key {1}", id, key); | |
var item = new KeyedOperation<T>() {Key = key, Func = calculationFunc, Id = id}; | |
queuedOps.OnNext(item); | |
return item.Result; | |
} | |
static IObservable<KeyedOperation> ProcessOperation(KeyedOperation operation) | |
{ | |
return Observable.Defer(() => Observable.Start(operation.EvaluateFunc, RxApp.TaskpoolScheduler)) | |
.Select(_ => operation); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment