Skip to content

Instantly share code, notes, and snippets.

@benjchristensen
Last active March 1, 2024 15:46
Show Gist options
  • Star 27 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save benjchristensen/e4524a308456f3c21c0b to your computer and use it in GitHub Desktop.
Save benjchristensen/e4524a308456f3c21c0b to your computer and use it in GitHub Desktop.
DebounceBuffer: Use publish(), debounce() and buffer() together to capture bursts of events.
import java.util.List;
import java.util.concurrent.TimeUnit;
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
public class DebounceBuffer {
public static void main(String args[]) {
// debounce to the last value in each burst
// intermittentBursts().debounce(10, TimeUnit.MILLISECONDS).toBlocking().forEach(System.out::println);
/* The following will emit a buffered list as it is debounced */
// first we multicast the stream ... using refCount so it handles the subscribe/unsubscribe
Observable<Integer> burstStream = intermittentBursts().take(20).publish().refCount();
// then we get the debounced version
Observable<Integer> debounced = burstStream.debounce(10, TimeUnit.MILLISECONDS);
// then the buffered one that uses the debounced stream to demark window start/stop
Observable<List<Integer>> buffered = burstStream.buffer(debounced);
// then we subscribe to the buffered stream so it does what we want
buffered.toBlocking().forEach(System.out::println);
}
/**
* This is an artificial source to demonstrate an infinite stream that bursts intermittently
*/
public static Observable<Integer> intermittentBursts() {
return Observable.create((Subscriber<? super Integer> s) -> {
while (!s.isUnsubscribed()) {
// burst some number of items
for (int i = 0; i < Math.random() * 20; i++) {
s.onNext(i);
}
try {
// sleep for a random amount of time
// NOTE: Only using Thread.sleep here as an artificial demo.
Thread.sleep((long) (Math.random() * 1000));
} catch (Exception e) {
// do nothing
}
}
}).subscribeOn(Schedulers.newThread()); // use newThread since we are using sleep to block
}
}
import java.util.List;
import java.util.concurrent.TimeUnit;
import rx.Observable;
import rx.Subscriber;
import rx.schedulers.Schedulers;
/**
* Variant of above that uses `publish(Func1<? super Observable<T>, ? extends Observable<R>> selector)`
* which allows multicasting without the need to use `refcount()` which can result in race conditions.
* */
public class DebounceBufferPublish {
public static void main(String args[]) {
/* The following will emit a buffered list as it is debounced */
Observable<List<Integer>> buffered = intermittentBursts().take(20).publish(stream -> {
// inside the `publish` function we can access `stream` in a multicasted manner
return stream.buffer(stream.debounce(10, TimeUnit.MILLISECONDS));
});
buffered.toBlocking().forEach(System.out::println);
}
/**
* This is an artificial source to demonstrate an infinite stream that bursts intermittently
*/
public static Observable<Integer> intermittentBursts() {
return Observable.create((Subscriber<? super Integer> s) -> {
while (!s.isUnsubscribed()) {
// burst some number of items
for (int i = 0; i < Math.random() * 20; i++) {
s.onNext(i);
}
try {
// sleep for a random amount of time
// NOTE: Only using Thread.sleep here as an artificial demo.
Thread.sleep((long) (Math.random() * 1000));
} catch (Exception e) {
// do nothing
}
}
}).subscribeOn(Schedulers.newThread()); // use newThread since we are using sleep to block
}
}
[0]
[0, 1, 2, 3, 4, 5, 6, 7]
[0, 1, 2, 3, 4, 5, 6]
[0, 1, 2, 3, 4, 5, 6]
[0, 1, 2, 3, 4, 5]
[0, 1, 2, 3]
[0, 1, 2, 3]
[0, 1, 2, 3, 4]
@roman-ku
Copy link

@hzsweers Are you sure? My RX skills aren't the best but I don't they are the same.

That observable has an output every timespan regardless if there is any input. There is no debouncing behavior. Debouncing behavior means that there will be no output if there is no input, and the output only happens if after there is input and then the input stops. One operates on periodic intervals and the other operates on aperiodic intervals.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment