Created
January 29, 2014 16:37
-
-
Save artem1/8691852 to your computer and use it in GitHub Desktop.
Sample of throttling when async pool datasource by event stream from redis, with unit test, c#, atomic, interlocked, lock-free
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class WorkQueueWatchService: IDisposable | |
{ | |
private readonly INotificationService notifications; | |
private readonly IWorkQueue queue; | |
private readonly Timer pullTimer; | |
private readonly int throttle; | |
private long changes; | |
private DateTime lastRun; | |
public WorkQueueWatchService(INotificationService ns, IWorkQueue wq, int throttleDelay = 2000) | |
{ | |
if (ns == null) | |
throw new ArgumentNullException("ns"); | |
if (wq == null) | |
throw new ArgumentNullException("wq"); | |
notifications = ns; | |
queue = wq; | |
throttle = Math.Max(0, throttleDelay); | |
lastRun = DateTime.MinValue; | |
pullTimer = new Timer(notused => Process(), null, Timeout.Infinite, Timeout.Infinite); | |
ns.SubscribeToReportChanged(OnReportChange); | |
Process(); | |
} | |
private void OnReportChange(string type, string login, object dto) | |
{ | |
//Only interesting in events, that change position in work queue | |
if (type != ReportEvents.CREATED && type != ReportEvents.RUNNED) return; | |
if (1 == Interlocked.Increment(ref changes)) | |
{ | |
//changes was 0 before increment - run notification | |
//if Process only completed and next we catch event - may be time delay less than throttle | |
//so, need run timer with appropriate delay | |
var msFromLastRun = Convert.ToInt64(DateTime.Now.Subtract(lastRun).TotalMilliseconds); | |
var dueDate = Math.Max(0, throttle - msFromLastRun); | |
//msFromLastRun > throttle ? 0 : throttle - msFromLastRun; | |
pullTimer.Change(dueDate, Timeout.Infinite); | |
} | |
} | |
private void Process() | |
{ | |
var prevChanges = Interlocked.CompareExchange(ref changes, 0, 0); //simple noop read operation, not needed to volatile changes | |
var snapshot = queue.Snapshot(); | |
//TODO send notifications | |
if (prevChanges < Interlocked.CompareExchange(ref changes, 0, 0)) | |
{ | |
//while notifications was processed there new events catched, | |
//plan for new run after throttling period | |
pullTimer.Change(throttle, Timeout.Infinite); | |
} | |
else | |
{ | |
//while notification was processed, no new events catched | |
//reset counter and wait for next event, that set counter to 1 and start timer again (see OnReportChange) | |
lastRun = DateTime.Now; | |
Interlocked.Exchange(ref changes, 0); | |
} | |
} | |
public void Dispose() | |
{ | |
pullTimer.Dispose(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[Test] | |
public void WatcherThrottleQuering() | |
{ | |
var wq = Substitute.For<IWorkQueue>(); | |
var callsTime = new List<DateTime>(); | |
wq.Snapshot().Returns(call => | |
{ | |
callsTime.Add(DateTime.Now); | |
return MakeEmptySnapshot(); | |
}); | |
Action<string, string, object> subscriber = null; | |
var ns = Substitute.For<INotificationService>(); | |
ns.WhenForAnyArgs(x => x.SubscribeToReportChanged(null)).Do(x => subscriber = x.Arg<Action<string, string, object>>()); | |
const int throttleDelay = 50; //lower value may exceed system timer accuracy | |
const double epsilon = throttleDelay*0.1;//10% | |
const int eventsCount = 40; | |
var watcher = new WorkQueueWatchService(ns, wq, throttleDelay); | |
Task[] tasks = Enumerable.Range(0, eventsCount) | |
.Select(i => Task.Run(() => | |
{ | |
Thread.Sleep((int) (throttleDelay * 0.9)); | |
subscriber(ReportEvents.CREATED, "user", null); | |
})) | |
.ToArray(); | |
Task.WaitAll(tasks); | |
Thread.Sleep(throttleDelay * (eventsCount + 1)); | |
for (var i = 1; i < callsTime.Count; i++) | |
{ | |
var a = callsTime[i - 1]; | |
var b = callsTime[i]; | |
var diff = b.Subtract(a); | |
Debug.WriteLine(diff); | |
Assert.That(diff.TotalMilliseconds, Is.GreaterThanOrEqualTo(throttleDelay - epsilon)); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment