Skip to content

Instantly share code, notes, and snippets.

@evacchi
Last active March 16, 2016 14:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save evacchi/ae362310dec8aacfdef4 to your computer and use it in GitHub Desktop.
Save evacchi/ae362310dec8aacfdef4 to your computer and use it in GitHub Desktop.

At some point 1) fails with:

2016-03-16 12:01:30 ERROR r.c.p.SchedulerGroup:477 - Unrouted exception
reactor.core.util.Exceptions$UpstreamException: java.lang.IllegalStateException: Queue is full?!
	at reactor.core.util.Exceptions.failUpstream(Exceptions.java:106)
	at reactor.core.util.Exceptions.onErrorDropped(Exceptions.java:153)
	at reactor.core.subscriber.ConsumerSubscriber.doError(ConsumerSubscriber.java:141)
	at reactor.core.subscriber.ConsumerSubscriber.onError(ConsumerSubscriber.java:128)
	at reactor.core.publisher.FluxMap$MapSubscriber.onError(FluxMap.java:137)
	at reactor.core.publisher.FluxDispatchOn$DispatchOnSubscriber.checkTerminated(FluxDispatchOn.java:493)
	at reactor.core.publisher.FluxDispatchOn$DispatchOnSubscriber.runAsync(FluxDispatchOn.java:417)
	at reactor.core.publisher.FluxDispatchOn$DispatchOnSubscriber.run(FluxDispatchOn.java:476)
	at reactor.core.publisher.SchedulerGroup$TaskSubscriber.onNext(SchedulerGroup.java:1081)
	at reactor.core.publisher.SchedulerGroup$TaskSubscriber.onNext(SchedulerGroup.java:1063)
	at reactor.core.publisher.TopicProcessor$TopicSubscriberLoop.run(TopicProcessor.java:877)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalStateException: Queue is full?!
	at reactor.core.publisher.FluxDispatchOn$DispatchOnSubscriber.onNext(FluxDispatchOn.java:259)
	at reactor.core.publisher.FluxLatest$LatestSubscriber.drain(FluxLatest.java:177)
	at reactor.core.publisher.FluxLatest$LatestSubscriber.request(FluxLatest.java:97)
	at reactor.core.publisher.FluxDispatchOn$DispatchOnSubscriber.runAsync(FluxDispatchOn.java:432)
	... 7 more
package rxplayground;
import reactor.core.publisher.Flux;
import reactor.core.publisher.SchedulerGroup;
import java.util.Iterator;
import java.util.function.Function;
/**
* Created by evacchi on 16/03/16.
*/
public class ReactorPublish {
static class Iter implements Iterator<Long> {
long count = 0;
@Override public boolean hasNext() { return true; }
@Override public Long next() { return count++; }
};
final Flux<Long> doubleFlux = Flux.fromIterable(Iter::new).as(Shared());
void stream() {
doubleFlux.as(DropOld()).map(this::fastComputation).consume(i -> System.out.println("1) " + i));
doubleFlux.as(DropOld()).map(this::slowComputation).consume(i -> System.err.println("2) " + i));
}
public static <T> Function<Flux<T>, Flux<T>> DropOld() {
return (flux) -> flux.onBackpressureLatest().dispatchOn(SchedulerGroup.single(), 1);
}
public static <T> Function<Flux<T>, Flux<T>> Shared() {
return (flux) -> flux.publishOn(SchedulerGroup.single()).publish().refCount();
}
<T> T fastComputation(T in) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
return in;
}
<T> T slowComputation(T in) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return in;
}
public static void main(String[] args) throws InterruptedException {
new ReactorPublish().stream();
Thread.currentThread().join();
}
}
package rxplayground;
import rx.Observable;
import rx.Scheduler;
import rx.Subscription;
import rx.schedulers.Schedulers;
import java.util.Iterator;
import java.util.function.Function;
/**
* Created by evacchi on 16/03/16.
*/
public class RxPublish {
static class Iter implements Iterator<Long> {
long count = 0;
@Override public boolean hasNext() { return true; }
@Override public Long next() { return count++; }
}
final Observable<Long> shared = Observable.from(Iter::new).compose(Shared());
void stream() {
System.setProperty("rx.ring-buffer.size", "1");
shared.compose(RxPublish.Latest()).map(this::fastComputation).subscribe(i -> System.out.println("1) " + i));
shared.compose(RxPublish.Latest()).map(this::slowComputation).subscribe(i -> System.err.println("2) " + i));
}
public static <T> Observable.Transformer<T,T> Shared() {
return (flux) -> flux.subscribeOn(Schedulers.newThread()).publish().refCount();
}
public static <T> Observable.Transformer<T,T> Latest() {
return (flux) -> flux.onBackpressureLatest().observeOn(Schedulers.newThread());
}
<T> T fastComputation(T in) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
return in;
}
<T> T slowComputation(T in) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
return in;
}
public static void main(String[] args) throws InterruptedException {
new RxPublish().stream();
Thread.currentThread().join();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment