import java.util.List; | |
import java.util.concurrent.TimeUnit; | |
import rx.Observable; | |
import rx.Subscriber; | |
import rx.schedulers.Schedulers; | |
public class DebounceBuffer { | |
public static void main(String args[]) { | |
// debounce to the last value in each burst | |
// intermittentBursts().debounce(10, TimeUnit.MILLISECONDS).toBlocking().forEach(System.out::println); | |
/* The following will emit a buffered list as it is debounced */ | |
// first we multicast the stream ... using refCount so it handles the subscribe/unsubscribe | |
Observable<Integer> burstStream = intermittentBursts().take(20).publish().refCount(); | |
// then we get the debounced version | |
Observable<Integer> debounced = burstStream.debounce(10, TimeUnit.MILLISECONDS); | |
// then the buffered one that uses the debounced stream to demark window start/stop | |
Observable<List<Integer>> buffered = burstStream.buffer(debounced); | |
// then we subscribe to the buffered stream so it does what we want | |
buffered.toBlocking().forEach(System.out::println); | |
} | |
/** | |
* This is an artificial source to demonstrate an infinite stream that bursts intermittently | |
*/ | |
public static Observable<Integer> intermittentBursts() { | |
return Observable.create((Subscriber<? super Integer> s) -> { | |
while (!s.isUnsubscribed()) { | |
// burst some number of items | |
for (int i = 0; i < Math.random() * 20; i++) { | |
s.onNext(i); | |
} | |
try { | |
// sleep for a random amount of time | |
// NOTE: Only using Thread.sleep here as an artificial demo. | |
Thread.sleep((long) (Math.random() * 1000)); | |
} catch (Exception e) { | |
// do nothing | |
} | |
} | |
}).subscribeOn(Schedulers.newThread()); // use newThread since we are using sleep to block | |
} | |
} |
import java.util.List; | |
import java.util.concurrent.TimeUnit; | |
import rx.Observable; | |
import rx.Subscriber; | |
import rx.schedulers.Schedulers; | |
/** | |
* Variant of above that uses `publish(Func1<? super Observable<T>, ? extends Observable<R>> selector)` | |
* which allows multicasting without the need to use `refcount()` which can result in race conditions. | |
* */ | |
public class DebounceBufferPublish { | |
public static void main(String args[]) { | |
/* The following will emit a buffered list as it is debounced */ | |
Observable<List<Integer>> buffered = intermittentBursts().take(20).publish(stream -> { | |
// inside the `publish` function we can access `stream` in a multicasted manner | |
return stream.buffer(stream.debounce(10, TimeUnit.MILLISECONDS)); | |
}); | |
buffered.toBlocking().forEach(System.out::println); | |
} | |
/** | |
* This is an artificial source to demonstrate an infinite stream that bursts intermittently | |
*/ | |
public static Observable<Integer> intermittentBursts() { | |
return Observable.create((Subscriber<? super Integer> s) -> { | |
while (!s.isUnsubscribed()) { | |
// burst some number of items | |
for (int i = 0; i < Math.random() * 20; i++) { | |
s.onNext(i); | |
} | |
try { | |
// sleep for a random amount of time | |
// NOTE: Only using Thread.sleep here as an artificial demo. | |
Thread.sleep((long) (Math.random() * 1000)); | |
} catch (Exception e) { | |
// do nothing | |
} | |
} | |
}).subscribeOn(Schedulers.newThread()); // use newThread since we are using sleep to block | |
} | |
} |
[0] | |
[0, 1, 2, 3, 4, 5, 6, 7] | |
[0, 1, 2, 3, 4, 5, 6] | |
[0, 1, 2, 3, 4, 5, 6] | |
[0, 1, 2, 3, 4, 5] | |
[0, 1, 2, 3] | |
[0, 1, 2, 3] | |
[0, 1, 2, 3, 4] |
This comment has been minimized.
This comment has been minimized.
The race condition is when the two consumers subscribe. Often on a hot stream it doesn't matter when subscribers come and go, and To ensure all subscribers start at exactly the same time and get the exact same values, Often on the buffered debounce it won't matter if there is a race, but it's worth understanding the tradeoff, and since the |
This comment has been minimized.
This comment has been minimized.
awesome! that makes perfect sense. |
This comment has been minimized.
This comment has been minimized.
Vnice |
This comment has been minimized.
This comment has been minimized.
What's the benefit of this over the |
This comment has been minimized.
This comment has been minimized.
@hzsweers +1 |
This comment has been minimized.
This comment has been minimized.
@hzsweers Are you sure? My RX skills aren't the best but I don't they are the same. That observable has an output every |
This comment has been minimized.
The
.publish
via the Function param is definitely a cleaner api in the second variant.I'm trying to better understand a situation where this race condition could happen in the first
DebounceBuffer
example?burstStream
is essentially shareable (courtesy: publish().refcount()).does this mean that the stream is essentially the same, however the emission times could potentially be different between
debounced
andbuffered
?This intro to rx article says that
refcount
helps "avoid" the race condition (but i could be taking that out of context with regards to this particular example).