Skip to content

Instantly share code, notes, and snippets.

@TonyTangAndroid
Created September 9, 2023 05:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save TonyTangAndroid/0f7253f6cc80cdaa88cc1f97fdeb7359 to your computer and use it in GitHub Desktop.
Save TonyTangAndroid/0f7253f6cc80cdaa88cc1f97fdeb7359 to your computer and use it in GitHub Desktop.
Buffer Debounce RxJava 2 Util
package aa.rx;
import io.reactivex.Observable;
import io.reactivex.ObservableTransformer;
import io.reactivex.Scheduler;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* <a href="https://stackoverflow.com/a/49866518/4068957">Inspired</a>
*/
public class BufferDebounceUtil {
public static <T> ObservableTransformer<T, List<T>> bufferDebounce(
long time, TimeUnit unit, Scheduler scheduler) {
return rawUpstream -> rawUpstream.publish(upstream -> bufferWithBoundary(upstream, time, unit, scheduler));
}
private static <T> Observable<List<T>> bufferWithBoundary(Observable<T> upstream, long time,
TimeUnit unit,
Scheduler scheduler) {
return upstream.buffer(boundary(upstream, time, unit, scheduler));
}
private static <T> Observable<T> boundary(Observable<T> upstream, long time, TimeUnit unit,
Scheduler scheduler) {
return upstream.debounce(time, unit, scheduler).takeUntil(complete(upstream));
}
/**
* The takeUntil is there to prevent the completion of o to trigger an empty buffer.
*/
private static <T> Observable<Object> complete(Observable<T> upstream) {
return upstream.ignoreElements().toObservable();
}
}
package aa.rx;
import com.google.common.collect.ImmutableList;
import io.reactivex.ObservableTransformer;
import io.reactivex.observers.TestObserver;
import io.reactivex.schedulers.TestScheduler;
import io.reactivex.subjects.PublishSubject;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
public class BufferDebounceUtilTest {
@Test
public void advanceTimeTo() {
PublishSubject<Integer> ps = PublishSubject.create();
TestScheduler testScheduler = new TestScheduler();
ObservableTransformer<Integer, List<Integer>> composer = BufferDebounceUtil.bufferDebounce(200,
TimeUnit.MILLISECONDS, testScheduler);
TestObserver<List<Integer>> testObserver = ps.compose(composer).test();
testObserver.assertNoErrors();
testObserver.assertNotComplete();
testObserver.assertNoValues();
ps.onNext(1);
ps.onNext(2);
testObserver.assertNoValues();
testScheduler.advanceTimeTo(100, TimeUnit.MILLISECONDS);
testObserver.assertNoValues();
ps.onNext(3);
testObserver.assertNoValues();
testScheduler.advanceTimeTo(150, TimeUnit.MILLISECONDS);
testObserver.assertValueCount(0);
ps.onNext(4);
testObserver.assertValueCount(0);
testScheduler.advanceTimeTo(400, TimeUnit.MILLISECONDS);
testObserver.assertValueCount(1);
testObserver.assertValue(ImmutableList.of(1, 2, 3, 4));
ps.onNext(5);
testObserver.assertValueCount(1);
testScheduler.advanceTimeTo(450, TimeUnit.MILLISECONDS);
testObserver.assertValueCount(1);
ps.onNext(6);
testObserver.assertValueCount(1);
testScheduler.advanceTimeTo(800, TimeUnit.MILLISECONDS);
testObserver.assertValueCount(2);
testObserver.assertValueAt(1, ImmutableList.of(5, 6));
ps.onNext(7);
ps.onComplete();
testObserver.assertValueCount(3);
testObserver.assertValueAt(2, ImmutableList.of(7));
testScheduler.advanceTimeTo(850, TimeUnit.MILLISECONDS);
testObserver.assertValueCount(3);
}
@Test
public void advanceTimeBy() {
PublishSubject<Integer> ps = PublishSubject.create();
TestScheduler testScheduler = new TestScheduler();
ObservableTransformer<Integer, List<Integer>> composer = BufferDebounceUtil.bufferDebounce(200,
TimeUnit.MILLISECONDS, testScheduler);
TestObserver<List<Integer>> testObserver = ps.compose(composer).test();
testObserver.assertNoErrors();
testObserver.assertNotComplete();
testObserver.assertNoValues();
ps.onNext(1);
ps.onNext(2);
testObserver.assertNoValues();
testScheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
testObserver.assertNoValues();
ps.onNext(3);
testObserver.assertNoValues();
testScheduler.advanceTimeBy(150, TimeUnit.MILLISECONDS);
testObserver.assertNoValues();
testScheduler.advanceTimeBy(50, TimeUnit.MILLISECONDS);
testObserver.assertValue(ImmutableList.of(1, 2, 3));
prolongTheTimeline(testScheduler);
ps.onNext(4);
testObserver.assertValueCount(1);
testScheduler.advanceTimeBy(400, TimeUnit.MILLISECONDS);
testObserver.assertValueCount(2);
testObserver.assertValueAt(1, ImmutableList.of(4));
prolongTheTimeline(testScheduler);
ps.onNext(5);
testObserver.assertValueCount(2);
testScheduler.advanceTimeBy(450, TimeUnit.MILLISECONDS);
testObserver.assertValueCount(3);
testObserver.assertValueAt(2, ImmutableList.of(5));
prolongTheTimeline(testScheduler);
ps.onNext(6);
testObserver.assertValueCount(3);
testScheduler.advanceTimeBy(800, TimeUnit.MILLISECONDS);
testObserver.assertValueCount(4);
testObserver.assertValueAt(3, ImmutableList.of(6));
prolongTheTimeline(testScheduler);
ps.onNext(7);
ps.onComplete();
testObserver.assertValueCount(5);
testObserver.assertValueAt(4, ImmutableList.of(7));
testScheduler.advanceTimeBy(850, TimeUnit.MILLISECONDS);
testObserver.assertValueCount(5);
}
private static void prolongTheTimeline(TestScheduler testScheduler) {
testScheduler.advanceTimeBy(1, TimeUnit.HOURS);
}
@Test
public void advanceTimeByRealUseCase() {
PublishSubject<Integer> ps = PublishSubject.create();
TestScheduler testScheduler = new TestScheduler();
ObservableTransformer<Integer, List<Integer>> composer = BufferDebounceUtil.bufferDebounce(200,
TimeUnit.MILLISECONDS, testScheduler);
TestObserver<List<Integer>> testObserver = ps.compose(composer).test();
testObserver.assertNoErrors();
testObserver.assertNotComplete();
testObserver.assertNoValues();
testScheduler.advanceTimeBy(1000, TimeUnit.MILLISECONDS);
testObserver.assertNoValues();
//iteration_1:first image
ps.onNext(1);
testScheduler.advanceTimeBy(80, TimeUnit.MILLISECONDS);
//iteration_1:second image
ps.onNext(2);
testScheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
testObserver.assertNoValues();
//iteration_1:concluded
testScheduler.advanceTimeBy(100, TimeUnit.MILLISECONDS);
testObserver.assertValueCount(1);
testObserver.assertValue(ImmutableList.of(1, 2));
//iteration_2:started
testScheduler.advanceTimeBy(10000, TimeUnit.MILLISECONDS);
//iteration_2:first image and only images
ps.onNext(3);
testScheduler.advanceTimeBy(150, TimeUnit.MILLISECONDS);
testObserver.assertValueCount(1);
testScheduler.advanceTimeBy(50, TimeUnit.MILLISECONDS);
testObserver.assertValueCount(2);
testObserver.assertValueAt(1, ImmutableList.of(3));
//iteration_3:started
testScheduler.advanceTimeBy(10000, TimeUnit.MILLISECONDS);
//iteration_3:first image
ps.onNext(4);
testScheduler.advanceTimeBy(199, TimeUnit.MILLISECONDS);
ps.onNext(5);
testScheduler.advanceTimeBy(199, TimeUnit.MILLISECONDS);
ps.onNext(6);
testScheduler.advanceTimeBy(199, TimeUnit.MILLISECONDS);
testObserver.assertValueCount(2);
testScheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS);
testObserver.assertValueCount(3);
testObserver.assertValueAt(2, ImmutableList.of(4, 5, 6));
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment