Skip to content

Instantly share code, notes, and snippets.

@nsk-mironov
Last active August 29, 2015 14:26
Show Gist options
  • Save nsk-mironov/ebf3c452843bbba81651 to your computer and use it in GitHub Desktop.
Save nsk-mironov/ebf3c452843bbba81651 to your computer and use it in GitHub Desktop.
OperatorThrottleList
public final class OperatorThrottleList<T> implements Operator<T, List<T>> {
private final Scheduler scheduler;
private final long interval;
public OperatorThrottleList(final long interval, final TimeUnit unit, final Scheduler scheduler) {
this.interval = unit.toMillis(interval);
this.scheduler = scheduler;
}
@Override
public Subscriber<? super List<T>> call(final Subscriber<? super T> subscriber) {
return new Subscriber<List<T>>(subscriber) {
private final SerialSubscription pending = new SerialSubscription();
private final Worker worker = scheduler.createWorker();
private final List<T> buffer = new ArrayList<T>();
private long lastEmit = 0;
{
subscriber.add(pending);
subscriber.add(worker);
}
@Override
public void onStart() {
request(Long.MAX_VALUE);
}
@Override
public void onNext(final T value) {
final long now = scheduler.now();
buffer.add(value);
if (lastEmit == 0 || now - lastEmit >= interval) {
emitCurrentBuffer();
} else {
pending.set(worker.schedule(() -> {
emitCurrentBuffer();
}, interval, TimeUnit.MILLISECONDS);
}
}
@Override
public void onCompleted() {
emitCurrentBuffer();
subscriber.onCompleted();
}
@Override
public void onError(final Throwable error) {
emitCurrentBuffer();
subscriber.onError(error);
}
private void emitCurrentBuffer() {
if (!buffer.isEmpty()) {
subscriber.onNext(new ArrayList<T>(buffer);
pending.set(Subscriptions.empty());
lastEmit = scheduler.now();
buffer.clear();
}
}
};
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment