Skip to content

Instantly share code, notes, and snippets.

@kakai248
Created October 12, 2017 14:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kakai248/771d084b0eb1b914b6c86d7e09a8efad to your computer and use it in GitHub Desktop.
Save kakai248/771d084b0eb1b914b6c86d7e09a8efad to your computer and use it in GitHub Desktop.
import java.util.concurrent.TimeUnit;
import io.reactivex.ObservableOperator;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
public class FilterWithTimeout<T> implements ObservableOperator<T, T> {
private final long intervalDuration;
private final TimeUnit unit;
private long lastTime = 0;
public FilterWithTimeout(long intervalDuration, TimeUnit unit) {
this.intervalDuration = intervalDuration;
this.unit = unit;
this.lastTime = 0;
}
@Override
public Observer<? super T> apply(Observer<? super T> observer) throws Exception {
return new Observer<T>() {
@Override
public void onSubscribe(Disposable d) {
observer.onSubscribe(d);
}
@Override
public void onNext(T t) {
// Only emit if we didn't emit anything in the last defined time.
long now = System.nanoTime();
if (now - lastTime >= unit.toNanos(intervalDuration)) {
observer.onNext(t);
lastTime = now;
}
}
@Override
public void onError(Throwable e) {
observer.onError(e);
}
@Override
public void onComplete() {
observer.onComplete();
}
};
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment