Skip to content

Instantly share code, notes, and snippets.

@mmonti
Last active August 29, 2015 14:19
Show Gist options
  • Save mmonti/3664d3d0c451078619ab to your computer and use it in GitHub Desktop.
Save mmonti/3664d3d0c451078619ab to your computer and use it in GitHub Desktop.
Operation to "delay" item emission from a sequence using Observables.
/**
*
*/
public class ConfigurableDelay<T> implements Observable.Operator<T, T> {
private final Func1<T, TimeConfiguration> itemToTime;
private final Scheduler scheduler;
public ConfigurableDelay(final Func1<T, TimeConfiguration> itemToTime) {
this(itemToTime, Schedulers.computation());
}
public ConfigurableDelay(final Func1<T, TimeConfiguration> itemToTime, final Scheduler scheduler) {
this.itemToTime = itemToTime;
this.scheduler = scheduler;
}
@Override
public Subscriber<? super T> call(final Subscriber<? super T> subscriber) {
return new Subscriber<T>(subscriber) {
private TimeConfiguration nextTime = null;
@Override
public void onCompleted() {
subscriber.onCompleted();
}
@Override
public void onError(final Throwable e) {
subscriber.onError(e);
}
@Override
public void onNext(final T item) {
TimeConfiguration previousNextTime = nextTime;
this.nextTime = itemToTime.call(item);
if (previousNextTime == null) {
subscriber.onNext(item);
} else {
scheduler.createWorker().schedule(new Action0() {
@Override
public void call() {
subscriber.onNext(item);
}
}, previousNextTime.getTime(), previousNextTime.getUnit());
}
}
};
}
}
// Client code...
Func1<Object, TimeConfiguration> timeConfiguration = new Func1() {
@Override
public TimeConfiguration call(Object o) {
return new TimeConfiguration(2L, TimeUnit.SECONDS);
}
};
from(mailMessages)
.lift(new ConfigurableDelay(timeConfiguration, Schedulers.immediate()))
.subscribe(new Action1<MailMessage>() {
@Override
public void call(MailMessage currentMailMessage) {
dispatch(dispatchResult, currentMailMessage);
}
});
/**
*
*/
public class TimeConfiguration {
private Long time = null;
private TimeUnit unit = null;
public TimeConfiguration(final Long time, final TimeUnit unit) {
this.time = time;
this.unit = unit;
}
public long getTime() {
return time;
}
public TimeUnit getUnit() {
return unit;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment