Last active
August 29, 2015 14:19
-
-
Save mmonti/3664d3d0c451078619ab to your computer and use it in GitHub Desktop.
Operation to "delay" item emission from a sequence using Observables.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* | |
*/ | |
public class ConfigurableDelay<T> implements Observable.Operator<T, T> { | |
private final Func1<T, TimeConfiguration> itemToTime; | |
private final Scheduler scheduler; | |
public ConfigurableDelay(final Func1<T, TimeConfiguration> itemToTime) { | |
this(itemToTime, Schedulers.computation()); | |
} | |
public ConfigurableDelay(final Func1<T, TimeConfiguration> itemToTime, final Scheduler scheduler) { | |
this.itemToTime = itemToTime; | |
this.scheduler = scheduler; | |
} | |
@Override | |
public Subscriber<? super T> call(final Subscriber<? super T> subscriber) { | |
return new Subscriber<T>(subscriber) { | |
private TimeConfiguration nextTime = null; | |
@Override | |
public void onCompleted() { | |
subscriber.onCompleted(); | |
} | |
@Override | |
public void onError(final Throwable e) { | |
subscriber.onError(e); | |
} | |
@Override | |
public void onNext(final T item) { | |
TimeConfiguration previousNextTime = nextTime; | |
this.nextTime = itemToTime.call(item); | |
if (previousNextTime == null) { | |
subscriber.onNext(item); | |
} else { | |
scheduler.createWorker().schedule(new Action0() { | |
@Override | |
public void call() { | |
subscriber.onNext(item); | |
} | |
}, previousNextTime.getTime(), previousNextTime.getUnit()); | |
} | |
} | |
}; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Client code... | |
Func1<Object, TimeConfiguration> timeConfiguration = new Func1() { | |
@Override | |
public TimeConfiguration call(Object o) { | |
return new TimeConfiguration(2L, TimeUnit.SECONDS); | |
} | |
}; | |
from(mailMessages) | |
.lift(new ConfigurableDelay(timeConfiguration, Schedulers.immediate())) | |
.subscribe(new Action1<MailMessage>() { | |
@Override | |
public void call(MailMessage currentMailMessage) { | |
dispatch(dispatchResult, currentMailMessage); | |
} | |
}); | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* | |
*/ | |
public class TimeConfiguration { | |
private Long time = null; | |
private TimeUnit unit = null; | |
public TimeConfiguration(final Long time, final TimeUnit unit) { | |
this.time = time; | |
this.unit = unit; | |
} | |
public long getTime() { | |
return time; | |
} | |
public TimeUnit getUnit() { | |
return unit; | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment