Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Delayed unsubscription operator for RxJava (done in the computation scheduler)
public static class DelayedUnsubscription<T> implements Operator<T, T> {
final private long time;
final private TimeUnit unit;
public DelayedUnsubscription(final long time, final TimeUnit unit) {
this.time = time;
this.unit = unit;
}
@Override
public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
final Subscriber<T> transformed = new Subscriber<T>(subscriber, false) {
@Override
public void onCompleted() {
if (!subscriber.isUnsubscribed()) {
subscriber.onCompleted();
}
}
@Override
public void onError(final Throwable e) {
if (!subscriber.isUnsubscribed()) {
subscriber.onError(e);
}
}
@Override
public void onNext(final T t) {
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(t);
}
}
};
subscriber.add(Subscriptions.create(new Action0() {
@Override
public void call() {
Schedulers.computation().createWorker().schedule(new Action0() {
@Override
public void call() {
transformed.unsubscribe();
}
}, time, unit);
}
}));
transformed.add(subscriber);
return transformed;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.