Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
import java.util.concurrent.atomic.AtomicLong;
import rx.Observable.OnSubscribe;
import rx.*;
import rx.internal.operators.BackpressureUtils;
public final class RxRange implements OnSubscribe<Integer> {
final int start;
final int count;
public RxRange(int start, int count) {
if (count < 0) {
throw new IllegalArgumentException();
}
this.start = start;
this.count = count;
}
@Override
public void call(Subscriber<? super Integer> t) {
if (count == 0) {
t.onCompleted();
return;
}
RangeProducer p = new RangeProducer(t, start, count);
t.setProducer(p);
}
public Observable<Integer> toObservable() {
return Observable.create(this);
}
static final class RangeProducer extends AtomicLong implements Producer {
/** */
private static final long serialVersionUID = 5318571951669533517L;
final Subscriber<? super Integer> child;
int index;
int remaining;
public RangeProducer(Subscriber<? super Integer> child, int start, int count) {
this.child = child;
this.index = start;
this.remaining = count;
}
@Override
public void request(long n) {
if (n < 0) {
throw new IllegalArgumentException();
}
if (n == 0) {
return;
}
if (BackpressureUtils.getAndAddRequest(this, n) != 0) {
return;
}
long r = n;
for (;;) {
if (child.isUnsubscribed()) {
return;
}
int i = index;
int k = remaining;
int e = 0;
while (r > 0 && k > 0) {
child.onNext(i);
if (child.isUnsubscribed()) {
return;
}
k--;
if (k == 0) {
child.onCompleted();
}
e++;
i++;
r--;
}
index = i;
remaining = k;
r = addAndGet(-e);
if (r == 0) {
return;
}
}
}
}
public static void main(String[] args) {
Observable<Integer> range = new RxRange(1, 10).toObservable();
range.take(5).subscribe(
System.out::println,
Throwable::printStackTrace,
() -> System.out.println("Done")
);
}
}
@cquludajun

This comment has been minimized.

Copy link

cquludajun commented Nov 27, 2018

it looks like you missed to exit the for loop in line 54 when k = 0.
something like this:
if (k == 0) {
child.onCompleted();
return;
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.