Skip to content

Instantly share code, notes, and snippets.

@Horusiath
Last active July 24, 2020 17:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Horusiath/1232377d4c1b5f3cbd4ec9a085213a75 to your computer and use it in GitHub Desktop.
Save Horusiath/1232377d4c1b5f3cbd4ec9a085213a75 to your computer and use it in GitHub Desktop.
Throttling operator for Rx.NET
using System;
using System.Collections.Concurrent;
using System.Threading;
namespace CsDemo
{
public static class ObservableExtensions
{
/// <summary>
/// Pass through up to <paramref name="count"/> items downstream within given <paramref name="interval"/>.
/// Once more elements are about to get through they will become buffered, until interval resets.
/// </summary>
public static IObservable<T> Throttle<T>(this IObservable<T> source, int count, TimeSpan interval) =>
new Throttle<T>(source, count, interval);
}
public class Throttle<T> : IObservable<T>
{
private readonly IObservable<T> _source;
private readonly int _count;
private readonly TimeSpan _interval;
public Throttle(IObservable<T> source, int count, TimeSpan interval)
{
_source = source;
_count = count;
_interval = interval;
}
public IDisposable Subscribe(IObserver<T> observer) =>
_source.SubscribeSafe(new Observer(observer, _count, _interval));
private sealed class Observer : IObserver<T>
{
private readonly IObserver<T> _observer;
private readonly int _count;
private readonly Timer _timer;
private readonly ConcurrentQueue<T> _buffer;
private int _remaining;
public Observer(IObserver<T> observer, int count, TimeSpan interval)
{
_observer = observer;
_remaining = _count = count;
_buffer = new ConcurrentQueue<T>();
_timer = new Timer(_ =>
{
// first, try to dequeue up to `_count` buffered items
// after that is done, reset `_remaining` quota to what's left
var i = _count;
while (i > 0 && _buffer.TryDequeue(out var value))
{
i--;
_observer.OnNext(value);
}
// reset remaining count at the end of the interval
Interlocked.Exchange(ref _remaining, i);
}, null, interval, interval);
}
public void OnCompleted()
{
// what to do with buffered items? Up to you.
_timer.Dispose();
_observer.OnCompleted();
}
public void OnError(Exception error)
{
_observer.OnError(error);
}
public void OnNext(T value)
{
if (Interlocked.Decrement(ref _remaining) >= 0)
{
// if we have free quota to spare in this interval, emit value downstream
_observer.OnNext(value);
}
else
{
// otherwise buffer value until timer will reset it
_buffer.Enqueue(value);
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment