Skip to content

Instantly share code, notes, and snippets.

@akarnokd
Created April 24, 2014 21:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save akarnokd/11269808 to your computer and use it in GitHub Desktop.
Save akarnokd/11269808 to your computer and use it in GitHub Desktop.
public final class OperatorDelay2<T> implements OnSubscribe<T> {
final Observable<T> source;
final long delay;
final TimeUnit unit;
final Scheduler scheduler;
public Delay2(Observable<T> source, long delay, TimeUnit unit, Scheduler scheduler) {
this.source = source;
this.delay = delay;
this.unit = unit;
this.scheduler = scheduler;
}
@Override
public void call(Subscriber<? super T> child) {
final Worker w = scheduler.createWorker();
child.add(w);
Observable.concat(source.map(new Func1<T, Observable<T>>() {
@Override
public Observable<T> call(final T x) {
final AsyncSubject<T> result = AsyncSubject.create();
w.schedule(new Action0() {
@Override
public void call() {
result.onNext(x);
result.onCompleted();
}
}, delay, unit);
return result;
}
})).subscribe(child);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment