Skip to content

Instantly share code, notes, and snippets.

@ahancock1
Created February 22, 2021 22:14
Show Gist options
  • Save ahancock1/ddbacedb59ab144c6dbce03a44af10da to your computer and use it in GitHub Desktop.
Save ahancock1/ddbacedb59ab144c6dbce03a44af10da to your computer and use it in GitHub Desktop.
Reactive Rate Gate using observables to limit access to resources async
using System;
using System.Collections.Concurrent;
using System.Reactive;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Threading;
using System.Threading.Tasks;
public class RateLimiter
{
private readonly ConcurrentQueue<DateTime> _requests =
new ConcurrentQueue<DateTime>();
private readonly AutoResetEvent _wait =
new AutoResetEvent(true);
public RateLimiter(TimeSpan interval, int limit)
{
Interval = interval;
Limit = limit;
}
public TimeSpan Interval { get; set; }
public int Limit { get; set; }
public void Dispose()
{
_wait.Dispose();
}
private void Purge(DateTime target)
{
while (_requests.TryPeek(out var timestamp))
{
if (timestamp >= target)
{
break;
}
_requests.TryDequeue(out _);
}
}
private IObservable<Unit> Synchronize(IObservable<Unit> source)
{
return Observable.Defer(() =>
{
_wait.WaitOne();
return source.Finally(() =>
_wait.Set());
});
}
public IObservable<Unit> Throttle()
{
return Synchronize(
Wait().Do(_ =>
_requests.Enqueue(DateTime.UtcNow)))
.SubscribeOn(TaskPoolScheduler.Default);
}
private IObservable<Unit> Wait()
{
return Observable.FromAsync(async t =>
{
var now = DateTime.UtcNow;
var target = now - Interval;
Purge(target);
if (_requests.Count >= Limit)
{
_requests.TryPeek(out var timestamp);
var delay = timestamp + Interval - now;
Console.WriteLine($"Throttle: {delay}");
await Task.Delay(delay, t);
}
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment