Skip to content

Instantly share code, notes, and snippets.

@ojplg
Created April 15, 2019 23:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ojplg/7d36380e469997c9090dd079e0d7c0a5 to your computer and use it in GitHub Desktop.
Save ojplg/7d36380e469997c9090dd079e0d7c0a5 to your computer and use it in GitHub Desktop.
HrormStreamSelector
package org.hrorm;
import java.util.Spliterators;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public class StreamSelector<ENTITY> {
private final Dao<ENTITY> dao;
public StreamSelector(Dao<ENTITY> entityDao){
this.dao = entityDao;
}
public Stream<ENTITY> select(Where where){
DaoEnclosingSpliterator<ENTITY> spliterator = new DaoEnclosingSpliterator<>(dao, where);
spliterator.start();
return StreamSupport.stream(spliterator, false);
}
private static class DaoEnclosingSpliterator<T> extends Spliterators.AbstractSpliterator<T> {
private final Dao<T> dao;
private final Where where;
private T next = null;
private final Object lock = new Object();
private final Thread foldingThread;
private boolean itemsRemaining = true;
DaoEnclosingSpliterator(Dao<T> dao, Where where){
super(Integer.MAX_VALUE, 0);
this.dao = dao;
this.where = where;
foldingThread = new Thread(this::doFolds);
}
public void start(){
foldingThread.start();
}
private void doFolds(){
Boolean completed = false;
BiFunction<Boolean, T, Boolean> process =
(flag, entity) ->
{
this.next = entity;
synchronized (lock){
lock.notify();
try {
lock.wait();
} catch(InterruptedException ex){
}
}
return flag;
};
this.itemsRemaining = dao.foldingSelect(completed, process, where);
synchronized (lock) {
lock.notify();
}
}
@Override
public boolean tryAdvance(Consumer<? super T> action) {
if ( ! itemsRemaining ){
return false;
}
synchronized (lock) {
try {
lock.wait();
if ( next != null ) {
action.accept(next);
}
next = null;
lock.notify();
} catch (InterruptedException ex){
}
return itemsRemaining;
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment