Last active
August 29, 2015 14:06
-
-
Save tkroman/09f7e5bce42574844bee to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.*; | |
import java.util.function.*; | |
import java.util.stream.*; | |
import org.neo4j.graphdb.GraphDatabaseService; | |
import org.neo4j.graphdb.Transaction; | |
/** | |
* Delegates all operations to an underlying Stream, but wraps them in transactions. | |
* Transaction is wrapped around the whole method call (i.e. map(), forEach() etc.), | |
* so may become potentially heavy. | |
*/ | |
public class TxEnclosingStream<T> implements Stream<T> { | |
private final Stream<T> stream; | |
private final GraphDatabaseService db; | |
public TxEnclosingStream(final Stream<T> stream, final GraphDatabaseService db) { | |
this.stream = stream; | |
this.db = db; | |
} | |
public TxEnclosingStream(final Iterator<T> stream, final GraphDatabaseService db) { | |
this.stream = StreamSupport.stream(Spliterators.spliteratorUnknownSize(stream, Spliterator.NONNULL), false); | |
this.db = db; | |
} | |
private <S> TxEnclosingStream<S> newLikeThis(final Stream<S> underlying) { | |
return new TxEnclosingStream<S>(underlying, db); | |
} | |
public Stream<T> filter(Predicate<? super T> predicate) { | |
try (final Transaction tx = db.beginTx()) { | |
final TxEnclosingStream<T> s = newLikeThis(stream.filter(predicate)); | |
tx.success(); | |
return s; | |
} | |
} | |
public <R> Stream<R> map(Function<? super T, ? extends R> mapper) { | |
try (final Transaction tx = db.beginTx()) { | |
final TxEnclosingStream<R> s = newLikeThis(stream.map(mapper)); | |
tx.success(); | |
return s; | |
} | |
} | |
public IntStream mapToInt(ToIntFunction<? super T> mapper) { | |
try (final Transaction tx = db.beginTx()) { | |
final IntStream s = stream.mapToInt(mapper); | |
tx.success(); | |
return s; | |
} | |
} | |
public LongStream mapToLong(ToLongFunction<? super T> mapper) { | |
try (final Transaction tx = db.beginTx()) { | |
final LongStream s = stream.mapToLong(mapper); | |
tx.success(); | |
return s; | |
} | |
} | |
public DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper) { | |
try (final Transaction tx = db.beginTx()) { | |
final DoubleStream s = stream.mapToDouble(mapper); | |
tx.success(); | |
return s; | |
} | |
} | |
public <R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper) { | |
try (final Transaction tx = db.beginTx()) { | |
final TxEnclosingStream<R> s = newLikeThis(stream.flatMap(mapper)); | |
tx.success(); | |
return s; | |
} | |
} | |
public IntStream flatMapToInt(Function<? super T, ? extends IntStream> mapper) { | |
try (final Transaction tx = db.beginTx()) { | |
final IntStream s = stream.flatMapToInt(mapper); | |
tx.success(); | |
return s; | |
} | |
} | |
public LongStream flatMapToLong(Function<? super T, ? extends LongStream> mapper) { | |
try (final Transaction tx = db.beginTx()) { | |
final LongStream s = stream.flatMapToLong(mapper); | |
tx.success(); | |
return s; | |
} | |
} | |
public DoubleStream flatMapToDouble(Function<? super T, ? extends DoubleStream> mapper) { | |
try (final Transaction tx = db.beginTx()) { | |
final DoubleStream s = stream.flatMapToDouble(mapper); | |
tx.success(); | |
return s; | |
} | |
} | |
@Override | |
public TxEnclosingStream<T> distinct() { | |
try (final Transaction tx = db.beginTx()) { | |
final TxEnclosingStream<T> s = newLikeThis(stream.distinct()); | |
tx.success(); | |
return s; | |
} | |
} | |
@Override | |
public TxEnclosingStream<T> sorted() { | |
try (final Transaction tx = db.beginTx()) { | |
final TxEnclosingStream<T> s = newLikeThis(stream.sorted()); | |
tx.success(); | |
return s; | |
} | |
} | |
public Stream<T> sorted(Comparator<? super T> comparator) { | |
try (final Transaction tx = db.beginTx()) { | |
final TxEnclosingStream<T> s = newLikeThis(stream.sorted(comparator)); | |
tx.success(); | |
return s; | |
} | |
} | |
public Stream<T> peek(Consumer<? super T> action) { | |
try (final Transaction tx = db.beginTx()) { | |
final TxEnclosingStream<T> s = newLikeThis(stream.peek(action)); | |
tx.success(); | |
return s; | |
} | |
} | |
@Override | |
public TxEnclosingStream<T> limit(long maxSize) { | |
try (final Transaction tx = db.beginTx()) { | |
final TxEnclosingStream<T> s = newLikeThis(stream.limit(maxSize)); | |
tx.success(); | |
return s; | |
} | |
} | |
@Override | |
public TxEnclosingStream<T> skip(long n) { | |
try (final Transaction tx = db.beginTx()) { | |
final TxEnclosingStream<T> s = newLikeThis(stream.skip(n)); | |
tx.success(); | |
return s; | |
} | |
} | |
public void forEach(Consumer<? super T> action) { | |
try (final Transaction tx = db.beginTx()) { | |
stream.forEach(action); | |
tx.success(); | |
} | |
} | |
public void forEachOrdered(Consumer<? super T> action) { | |
try (final Transaction tx = db.beginTx()) { | |
stream.forEachOrdered(action); | |
tx.success(); | |
} | |
} | |
@Override | |
public Object[] toArray() { | |
try (final Transaction tx = db.beginTx()) { | |
Object[] s = stream.toArray(); | |
tx.success(); | |
return s; | |
} | |
} | |
@Override | |
public <A> A[] toArray(IntFunction<A[]> generator) { | |
try (final Transaction tx = db.beginTx()) { | |
A[] s = stream.toArray(generator); | |
tx.success(); | |
return s; | |
} | |
} | |
public T reduce(T identity, BinaryOperator<T> accumulator) { | |
try (final Transaction tx = db.beginTx()) { | |
T s = stream.reduce(identity, accumulator); | |
tx.success(); | |
return s; | |
} | |
} | |
public Optional<T> reduce(BinaryOperator<T> accumulator) { | |
try (final Transaction tx = db.beginTx()) { | |
Optional<T> s = stream.reduce(accumulator); | |
tx.success(); | |
return s; | |
} | |
} | |
public <U> U reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner) { | |
try (final Transaction tx = db.beginTx()) { | |
U s = stream.reduce(identity, accumulator, combiner); | |
tx.success(); | |
return s; | |
} | |
} | |
public <R> R collect(Supplier<R> supplier, BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner) { | |
try (final Transaction tx = db.beginTx()) { | |
R s = stream.collect(supplier, accumulator, combiner); | |
tx.success(); | |
return s; | |
} | |
} | |
public <R, A> R collect(Collector<? super T, A, R> collector) { | |
try (final Transaction tx = db.beginTx()) { | |
R s = stream.collect(collector); | |
tx.success(); | |
return s; | |
} | |
} | |
public Optional<T> min(Comparator<? super T> comparator) { | |
try (final Transaction tx = db.beginTx()) { | |
Optional<T> s = stream.min(comparator); | |
tx.success(); | |
return s; | |
} | |
} | |
public Optional<T> max(Comparator<? super T> comparator) { | |
try (final Transaction tx = db.beginTx()) { | |
Optional<T> s = stream.max(comparator); | |
tx.success(); | |
return s; | |
} | |
} | |
@Override | |
public long count() { | |
try (final Transaction tx = db.beginTx()) { | |
long s = stream.count(); | |
tx.success(); | |
return s; | |
} | |
} | |
public boolean anyMatch(Predicate<? super T> predicate) { | |
try (final Transaction tx = db.beginTx()) { | |
boolean s = stream.anyMatch(predicate); | |
tx.success(); | |
return s; | |
} | |
} | |
public boolean allMatch(Predicate<? super T> predicate) { | |
try (final Transaction tx = db.beginTx()) { | |
boolean s = stream.allMatch(predicate); | |
tx.success(); | |
return s; | |
} | |
} | |
public boolean noneMatch(Predicate<? super T> predicate) { | |
try (final Transaction tx = db.beginTx()) { | |
boolean s = stream.noneMatch(predicate); | |
tx.success(); | |
return s; | |
} | |
} | |
@Override | |
public Optional<T> findFirst() { | |
try (final Transaction tx = db.beginTx()) { | |
Optional<T> s = stream.findFirst(); | |
tx.success(); | |
return s; | |
} | |
} | |
@Override | |
public Optional<T> findAny() { | |
try (final Transaction tx = db.beginTx()) { | |
Optional<T> s = stream.findAny(); | |
tx.success(); | |
return s; | |
} | |
} | |
@Override | |
public Iterator<T> iterator() { | |
try (final Transaction tx = db.beginTx()) { | |
Iterator<T> s = stream.iterator(); | |
tx.success(); | |
return s; | |
} | |
} | |
@Override | |
public Spliterator<T> spliterator() { | |
try (final Transaction tx = db.beginTx()) { | |
Spliterator<T> s = stream.spliterator(); | |
tx.success(); | |
return s; | |
} | |
} | |
@Override | |
public boolean isParallel() { | |
try (final Transaction tx = db.beginTx()) { | |
boolean s = stream.isParallel(); | |
tx.success(); | |
return s; | |
} | |
} | |
@Override | |
public TxEnclosingStream<T> sequential() { | |
try (final Transaction tx = db.beginTx()) { | |
final TxEnclosingStream<T> s = newLikeThis(stream.sequential()); | |
tx.success(); | |
return s; | |
} | |
} | |
@Override | |
public TxEnclosingStream<T> parallel() { | |
try (final Transaction tx = db.beginTx()) { | |
final TxEnclosingStream<T> s = newLikeThis(stream.parallel()); | |
tx.success(); | |
return s; | |
} | |
} | |
@Override | |
public TxEnclosingStream<T> unordered() { | |
try (final Transaction tx = db.beginTx()) { | |
final TxEnclosingStream<T> s = newLikeThis(stream.unordered()); | |
tx.success(); | |
return s; | |
} | |
} | |
@Override | |
public TxEnclosingStream<T> onClose(Runnable closeHandler) { | |
try (final Transaction tx = db.beginTx()) { | |
final TxEnclosingStream<T> s = newLikeThis(stream.onClose(closeHandler)); | |
tx.success(); | |
return s; | |
} | |
} | |
@Override | |
public void close() { | |
stream.close(); } | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment