Created
November 4, 2015 17:09
-
-
Save aaronzirbes/7c2cda2c0b31ec42d6d0 to your computer and use it in GitHub Desktop.
A observable queue that you can push to for deferred processing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.concurrent.LinkedBlockingQueue | |
import java.util.concurrent.TimeUnit | |
import rx.Observable | |
import rx.Subscriber | |
import rx.schedulers.Schedulers | |
class ObservableQueue<T> { | |
protected long pollTimeout = 1 | |
@Delegate | |
protected Observable observable | |
protected LinkedBlockingQueue queue | |
protected Subscriber subscriber | |
ObservableQueue() { | |
this.queue = new LinkedBlockingQueue<T>() | |
this.observable = getObservableForQueue() | |
} | |
ObservableQueue async() { | |
observable = observable.observeOn(Schedulers.io()) | |
.subscribeOn(Schedulers.io()) | |
return this | |
} | |
ObservableQueue throttled(TimeUnit timeUnit, long time) { | |
observable = observable.throttleLast(time, timeUnit) | |
return this | |
} | |
ObservableQueue wtihQueuePollTimeout(long seconds) { | |
this.pollTimeout = seconds | |
return this | |
} | |
void unsubscribe() { | |
subscriber.unsubscribe() | |
} | |
boolean isUnsubscribed() { | |
return subscriber.unsubscribed | |
} | |
void add(T thing) { | |
queue.add(thing) | |
} | |
protected Observable getObservableForQueue() { | |
return Observable.create({ Subscriber<T> subscriber -> | |
this.subscriber = subscriber | |
while (!subscriber.unsubscribed) { | |
T thing = queue.poll(pollTimeout, TimeUnit.SECONDS) | |
if (thing != null) { subscriber.onNext(thing) } | |
} | |
} as Observable.OnSubscribe<T>) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment