Skip to content

Instantly share code, notes, and snippets.

@aaronzirbes
Created November 4, 2015 17:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save aaronzirbes/7c2cda2c0b31ec42d6d0 to your computer and use it in GitHub Desktop.
Save aaronzirbes/7c2cda2c0b31ec42d6d0 to your computer and use it in GitHub Desktop.
A observable queue that you can push to for deferred processing
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit
import rx.Observable
import rx.Subscriber
import rx.schedulers.Schedulers
class ObservableQueue<T> {
protected long pollTimeout = 1
@Delegate
protected Observable observable
protected LinkedBlockingQueue queue
protected Subscriber subscriber
ObservableQueue() {
this.queue = new LinkedBlockingQueue<T>()
this.observable = getObservableForQueue()
}
ObservableQueue async() {
observable = observable.observeOn(Schedulers.io())
.subscribeOn(Schedulers.io())
return this
}
ObservableQueue throttled(TimeUnit timeUnit, long time) {
observable = observable.throttleLast(time, timeUnit)
return this
}
ObservableQueue wtihQueuePollTimeout(long seconds) {
this.pollTimeout = seconds
return this
}
void unsubscribe() {
subscriber.unsubscribe()
}
boolean isUnsubscribed() {
return subscriber.unsubscribed
}
void add(T thing) {
queue.add(thing)
}
protected Observable getObservableForQueue() {
return Observable.create({ Subscriber<T> subscriber ->
this.subscriber = subscriber
while (!subscriber.unsubscribed) {
T thing = queue.poll(pollTimeout, TimeUnit.SECONDS)
if (thing != null) { subscriber.onNext(thing) }
}
} as Observable.OnSubscribe<T>)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment