Skip to content

Instantly share code, notes, and snippets.

@anaisbetts
Last active January 19, 2019 18:09
Show Gist options
  • Star 7 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save anaisbetts/6263961 to your computer and use it in GitHub Desktop.
Save anaisbetts/6263961 to your computer and use it in GitHub Desktop.
// Only let input Observable fire every 'n' seconds at most
// but unlike Throttle, items fire immediately when they aren't
// rate-limited.
public IObservable<T> RateLimit<T>(this IObservable<T> This, TimeSpan interval, IScheduler scheduler)
{
var slot = default(IObservable<Unit>);
var input = This.Publish().RefCount();
return input.Window(input, _ => {
if (slot != null) return slot;
slot = Observable.Return(Unit.Default);
return Observable.Return(Unit.Default)
.Delay(interval, sched)
.Finally(() => slot = null);
})
.Select(x => x.Take(1))
.Merge();
}
[Test]
public void RateLimitingTest()
{
(new TestScheduler()).With(sched => {
var input = sched.CreateHotObservable(
sched.OnNextAt(100, Unit.Default), // Yes
sched.OnNextAt(200, Unit.Default),
sched.OnNextAt(300, Unit.Default),
sched.OnNextAt(1000, Unit.Default),
sched.OnNextAt(1200, Unit.Default), // >1100, Yes
sched.OnNextAt(2100, Unit.Default), // Not >1200+1000, No
sched.OnNextAt(2300, Unit.Default) // > 2200, Yes
);
var slot = default(IObservable<Unit>);
var output = input.RateLimit(TimeSpan.FromMilliseconds(1000), sched)
.Timestamp(sched).CreateCollection();
sched.AdvanceToMs(50);
Assert.AreEqual(0, output.Count);
sched.AdvanceToMs(150);
Assert.AreEqual(1, output.Count);
sched.AdvanceToMs(1000);
Assert.AreEqual(1, output.Count);
sched.AdvanceToMs(10 * 1000);
Assert.AreEqual(3, output.Count);
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment