Skip to content

Instantly share code, notes, and snippets.

@Eternity-Yarr
Created June 16, 2016 12:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Eternity-Yarr/93dd29aa90824cb92c53494c8330a601 to your computer and use it in GitHub Desktop.
Save Eternity-Yarr/93dd29aa90824cb92c53494c8330a601 to your computer and use it in GitHub Desktop.
public static void main(String[] args) {
OracleQueue<String> someQueue = new OracleQueue<>(50);
ExecutorService es = Executors.newFixedThreadPool(4);
Executors.newSingleThreadScheduledExecutor()
.scheduleAtFixedRate(
() -> System.out.println("Queue size:" + someQueue.queue.size()),
1, 1, TimeUnit.SECONDS);
Executors.newSingleThreadScheduledExecutor().scheduleWithFixedDelay(
() -> someQueue.offer(UUID.randomUUID().toString()),
1, 1, TimeUnit.MILLISECONDS
);
Observable.from(someQueue)
.flatMap(el ->
Observable.just(el)
.subscribeOn(Schedulers.from(es))
.doOnNext(s -> {
System.out.println(s + " on " + Thread.currentThread().getName());
try {
Thread.sleep(1000);
System.out.println("Done with " + s);
} catch (Exception e) {
;
}
})
)
.subscribe(Subscribers.empty());
}
private static class OracleQueue<T> implements Iterable<T> {
private final BlockingQueue<T> queue;
OracleQueue(int queueSize) {
queue = new LinkedBlockingQueue<>(queueSize);
}
void offer(T element) {
try {
queue.offer(element, 1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@Override
public Iterator<T> iterator() {
return new Iterator<T>() {
@Override
public boolean hasNext() {
return true;
}
@Override
public T next() {
try {
return queue.take();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
};
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment