Skip to content

Instantly share code, notes, and snippets.

@akarnokd
Last active December 22, 2017 10:36
Show Gist options
  • Save akarnokd/98a1a728a89e6f8dbd1efab104c2be0f to your computer and use it in GitHub Desktop.
Save akarnokd/98a1a728a89e6f8dbd1efab104c2be0f to your computer and use it in GitHub Desktop.
package pkg;
import rx.Observable;
import rx.Observable.OnSubscribe;
import rx.Observable.Transformer;
import rx.Producer;
import rx.Scheduler;
import rx.Scheduler.Worker;
import rx.Subscriber;
import rx.exceptions.MissingBackpressureException;
import rx.functions.Action0;
import rx.internal.operators.BackpressureUtils;
import rx.observers.SerializedSubscriber;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
public final class ObservableConflate<T> implements OnSubscribe<T> {
final Observable<T> source;
final long timeout;
final TimeUnit unit;
final Scheduler scheduler;
public ObservableConflate(Observable<T> source, long timeout, TimeUnit unit, Scheduler scheduler) {
this.source = source;
this.timeout = timeout;
this.unit = unit;
this.scheduler = scheduler;
}
@Override
public void call(Subscriber<? super T> t) {
t = new SerializedSubscriber<>(t);
Worker worker = scheduler.createWorker();
ConflateSubscriber<T> parent = new ConflateSubscriber<>(t, timeout, unit, worker);
t.add(parent);
t.add(worker);
t.setProducer(parent.requested);
source.subscribe(parent);
}
static final class ConflateSubscriber<T> extends Subscriber<T> implements Action0 {
static final Object EMPTY = new Object();
final Subscriber<? super T> actual;
final long timeout;
final TimeUnit unit;
final Worker worker;
final AtomicReference<Object> current;
final Requested requested;
volatile boolean gate;
public ConflateSubscriber(Subscriber<? super T> actual, long timeout, TimeUnit unit, Worker worker) {
this.actual = actual;
this.timeout = timeout;
this.unit = unit;
this.worker = worker;
this.current = new AtomicReference<>(EMPTY);
this.requested = new Requested();
this.request(Long.MAX_VALUE);
}
@SuppressWarnings("unchecked")
@Override
public void onNext(T t) {
if (!gate) {
gate = true;
if (emit(t)) {
worker.schedule(this, timeout, unit);
}
} else {
current.set(t);
if (!gate) {
Object o = current.getAndSet(EMPTY);
if (o != EMPTY) {
gate = true;
if (emit((T)o)) {
worker.schedule(this, timeout, unit);
}
}
}
}
}
@Override
public void onError(Throwable e) {
actual.onError(e);
worker.unsubscribe();
}
@SuppressWarnings("unchecked")
@Override
public void onCompleted() {
Object o = current.getAndSet(EMPTY);
if (o != EMPTY) {
if (!emit((T)o)) {
return;
}
}
actual.onCompleted();
worker.unsubscribe();
}
@SuppressWarnings("unchecked")
@Override
public void call() {
Object o = current.getAndSet(EMPTY);
if (o == EMPTY) {
gate = false;
} else {
if (emit((T)o)) {
worker.schedule(this, timeout, unit);
}
}
}
boolean emit(T v) {
if (requested.get() != 0L) {
actual.onNext(v);
requested.producedOne();
return true;
}
unsubscribe();
actual.onError(new MissingBackpressureException("Could not emit value due to lack of requests"));
return false;
}
final class Requested extends AtomicLong implements Producer {
private static final long serialVersionUID = 5469053227556974007L;
@Override
public void request(long n) {
if (n > 0L) {
BackpressureUtils.getAndAddRequest(this, n);
}
else if (n < 0L) {
throw new IllegalArgumentException("n >= 0 required but it was " + n);
}
}
void producedOne() {
BackpressureUtils.produced(this, 1L);
}
}
}
}
package pkg;
import java.util.concurrent.TimeUnit;
import org.junit.Test;
import rx.Observable;
import rx.observers.AssertableSubscriber;
import rx.schedulers.TestScheduler;
import rx.subjects.PublishSubject;
public class ObservableConflateTest {
@Test
public void normal() {
TestScheduler scheduler = new TestScheduler();
PublishSubject<Integer> source = PublishSubject.create();
AssertableSubscriber<Integer> ts = source.compose(f -> Observable.create(new ObservableConflate<Integer>(f, 1, TimeUnit.SECONDS, scheduler)))
.test();
source.onNext(1);
ts.assertValue(1);
source.onNext(2);
ts.assertValue(1);
source.onNext(3);
ts.assertValue(1);
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
ts.assertValues(1, 3);
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
ts.assertValues(1, 3);
source.onNext(4);
ts.assertValues(1, 3, 4);
source.onNext(5);
source.onCompleted();
ts.assertResult(1, 3, 4, 5);
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
ts.assertResult(1, 3, 4, 5);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment