Skip to content

Instantly share code, notes, and snippets.

@artem1
Created January 29, 2014 16:37
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save artem1/8691852 to your computer and use it in GitHub Desktop.
Save artem1/8691852 to your computer and use it in GitHub Desktop.
Sample of throttling when async pool datasource by event stream from redis, with unit test, c#, atomic, interlocked, lock-free
public class WorkQueueWatchService: IDisposable
{
private readonly INotificationService notifications;
private readonly IWorkQueue queue;
private readonly Timer pullTimer;
private readonly int throttle;
private long changes;
private DateTime lastRun;
public WorkQueueWatchService(INotificationService ns, IWorkQueue wq, int throttleDelay = 2000)
{
if (ns == null)
throw new ArgumentNullException("ns");
if (wq == null)
throw new ArgumentNullException("wq");
notifications = ns;
queue = wq;
throttle = Math.Max(0, throttleDelay);
lastRun = DateTime.MinValue;
pullTimer = new Timer(notused => Process(), null, Timeout.Infinite, Timeout.Infinite);
ns.SubscribeToReportChanged(OnReportChange);
Process();
}
private void OnReportChange(string type, string login, object dto)
{
//Only interesting in events, that change position in work queue
if (type != ReportEvents.CREATED && type != ReportEvents.RUNNED) return;
if (1 == Interlocked.Increment(ref changes))
{
//changes was 0 before increment - run notification
//if Process only completed and next we catch event - may be time delay less than throttle
//so, need run timer with appropriate delay
var msFromLastRun = Convert.ToInt64(DateTime.Now.Subtract(lastRun).TotalMilliseconds);
var dueDate = Math.Max(0, throttle - msFromLastRun);
//msFromLastRun > throttle ? 0 : throttle - msFromLastRun;
pullTimer.Change(dueDate, Timeout.Infinite);
}
}
private void Process()
{
var prevChanges = Interlocked.CompareExchange(ref changes, 0, 0); //simple noop read operation, not needed to volatile changes
var snapshot = queue.Snapshot();
//TODO send notifications
if (prevChanges < Interlocked.CompareExchange(ref changes, 0, 0))
{
//while notifications was processed there new events catched,
//plan for new run after throttling period
pullTimer.Change(throttle, Timeout.Infinite);
}
else
{
//while notification was processed, no new events catched
//reset counter and wait for next event, that set counter to 1 and start timer again (see OnReportChange)
lastRun = DateTime.Now;
Interlocked.Exchange(ref changes, 0);
}
}
public void Dispose()
{
pullTimer.Dispose();
}
}
[Test]
public void WatcherThrottleQuering()
{
var wq = Substitute.For<IWorkQueue>();
var callsTime = new List<DateTime>();
wq.Snapshot().Returns(call =>
{
callsTime.Add(DateTime.Now);
return MakeEmptySnapshot();
});
Action<string, string, object> subscriber = null;
var ns = Substitute.For<INotificationService>();
ns.WhenForAnyArgs(x => x.SubscribeToReportChanged(null)).Do(x => subscriber = x.Arg<Action<string, string, object>>());
const int throttleDelay = 50; //lower value may exceed system timer accuracy
const double epsilon = throttleDelay*0.1;//10%
const int eventsCount = 40;
var watcher = new WorkQueueWatchService(ns, wq, throttleDelay);
Task[] tasks = Enumerable.Range(0, eventsCount)
.Select(i => Task.Run(() =>
{
Thread.Sleep((int) (throttleDelay * 0.9));
subscriber(ReportEvents.CREATED, "user", null);
}))
.ToArray();
Task.WaitAll(tasks);
Thread.Sleep(throttleDelay * (eventsCount + 1));
for (var i = 1; i < callsTime.Count; i++)
{
var a = callsTime[i - 1];
var b = callsTime[i];
var diff = b.Subtract(a);
Debug.WriteLine(diff);
Assert.That(diff.TotalMilliseconds, Is.GreaterThanOrEqualTo(throttleDelay - epsilon));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment