Skip to content

Instantly share code, notes, and snippets.

@tkroman
Last active August 29, 2015 14:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tkroman/09f7e5bce42574844bee to your computer and use it in GitHub Desktop.
Save tkroman/09f7e5bce42574844bee to your computer and use it in GitHub Desktop.
import java.util.*;
import java.util.function.*;
import java.util.stream.*;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Transaction;
/**
* Delegates all operations to an underlying Stream, but wraps them in transactions.
* Transaction is wrapped around the whole method call (i.e. map(), forEach() etc.),
* so may become potentially heavy.
*/
public class TxEnclosingStream<T> implements Stream<T> {
private final Stream<T> stream;
private final GraphDatabaseService db;
public TxEnclosingStream(final Stream<T> stream, final GraphDatabaseService db) {
this.stream = stream;
this.db = db;
}
public TxEnclosingStream(final Iterator<T> stream, final GraphDatabaseService db) {
this.stream = StreamSupport.stream(Spliterators.spliteratorUnknownSize(stream, Spliterator.NONNULL), false);
this.db = db;
}
private <S> TxEnclosingStream<S> newLikeThis(final Stream<S> underlying) {
return new TxEnclosingStream<S>(underlying, db);
}
public Stream<T> filter(Predicate<? super T> predicate) {
try (final Transaction tx = db.beginTx()) {
final TxEnclosingStream<T> s = newLikeThis(stream.filter(predicate));
tx.success();
return s;
}
}
public <R> Stream<R> map(Function<? super T, ? extends R> mapper) {
try (final Transaction tx = db.beginTx()) {
final TxEnclosingStream<R> s = newLikeThis(stream.map(mapper));
tx.success();
return s;
}
}
public IntStream mapToInt(ToIntFunction<? super T> mapper) {
try (final Transaction tx = db.beginTx()) {
final IntStream s = stream.mapToInt(mapper);
tx.success();
return s;
}
}
public LongStream mapToLong(ToLongFunction<? super T> mapper) {
try (final Transaction tx = db.beginTx()) {
final LongStream s = stream.mapToLong(mapper);
tx.success();
return s;
}
}
public DoubleStream mapToDouble(ToDoubleFunction<? super T> mapper) {
try (final Transaction tx = db.beginTx()) {
final DoubleStream s = stream.mapToDouble(mapper);
tx.success();
return s;
}
}
public <R> Stream<R> flatMap(Function<? super T, ? extends Stream<? extends R>> mapper) {
try (final Transaction tx = db.beginTx()) {
final TxEnclosingStream<R> s = newLikeThis(stream.flatMap(mapper));
tx.success();
return s;
}
}
public IntStream flatMapToInt(Function<? super T, ? extends IntStream> mapper) {
try (final Transaction tx = db.beginTx()) {
final IntStream s = stream.flatMapToInt(mapper);
tx.success();
return s;
}
}
public LongStream flatMapToLong(Function<? super T, ? extends LongStream> mapper) {
try (final Transaction tx = db.beginTx()) {
final LongStream s = stream.flatMapToLong(mapper);
tx.success();
return s;
}
}
public DoubleStream flatMapToDouble(Function<? super T, ? extends DoubleStream> mapper) {
try (final Transaction tx = db.beginTx()) {
final DoubleStream s = stream.flatMapToDouble(mapper);
tx.success();
return s;
}
}
@Override
public TxEnclosingStream<T> distinct() {
try (final Transaction tx = db.beginTx()) {
final TxEnclosingStream<T> s = newLikeThis(stream.distinct());
tx.success();
return s;
}
}
@Override
public TxEnclosingStream<T> sorted() {
try (final Transaction tx = db.beginTx()) {
final TxEnclosingStream<T> s = newLikeThis(stream.sorted());
tx.success();
return s;
}
}
public Stream<T> sorted(Comparator<? super T> comparator) {
try (final Transaction tx = db.beginTx()) {
final TxEnclosingStream<T> s = newLikeThis(stream.sorted(comparator));
tx.success();
return s;
}
}
public Stream<T> peek(Consumer<? super T> action) {
try (final Transaction tx = db.beginTx()) {
final TxEnclosingStream<T> s = newLikeThis(stream.peek(action));
tx.success();
return s;
}
}
@Override
public TxEnclosingStream<T> limit(long maxSize) {
try (final Transaction tx = db.beginTx()) {
final TxEnclosingStream<T> s = newLikeThis(stream.limit(maxSize));
tx.success();
return s;
}
}
@Override
public TxEnclosingStream<T> skip(long n) {
try (final Transaction tx = db.beginTx()) {
final TxEnclosingStream<T> s = newLikeThis(stream.skip(n));
tx.success();
return s;
}
}
public void forEach(Consumer<? super T> action) {
try (final Transaction tx = db.beginTx()) {
stream.forEach(action);
tx.success();
}
}
public void forEachOrdered(Consumer<? super T> action) {
try (final Transaction tx = db.beginTx()) {
stream.forEachOrdered(action);
tx.success();
}
}
@Override
public Object[] toArray() {
try (final Transaction tx = db.beginTx()) {
Object[] s = stream.toArray();
tx.success();
return s;
}
}
@Override
public <A> A[] toArray(IntFunction<A[]> generator) {
try (final Transaction tx = db.beginTx()) {
A[] s = stream.toArray(generator);
tx.success();
return s;
}
}
public T reduce(T identity, BinaryOperator<T> accumulator) {
try (final Transaction tx = db.beginTx()) {
T s = stream.reduce(identity, accumulator);
tx.success();
return s;
}
}
public Optional<T> reduce(BinaryOperator<T> accumulator) {
try (final Transaction tx = db.beginTx()) {
Optional<T> s = stream.reduce(accumulator);
tx.success();
return s;
}
}
public <U> U reduce(U identity, BiFunction<U, ? super T, U> accumulator, BinaryOperator<U> combiner) {
try (final Transaction tx = db.beginTx()) {
U s = stream.reduce(identity, accumulator, combiner);
tx.success();
return s;
}
}
public <R> R collect(Supplier<R> supplier, BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner) {
try (final Transaction tx = db.beginTx()) {
R s = stream.collect(supplier, accumulator, combiner);
tx.success();
return s;
}
}
public <R, A> R collect(Collector<? super T, A, R> collector) {
try (final Transaction tx = db.beginTx()) {
R s = stream.collect(collector);
tx.success();
return s;
}
}
public Optional<T> min(Comparator<? super T> comparator) {
try (final Transaction tx = db.beginTx()) {
Optional<T> s = stream.min(comparator);
tx.success();
return s;
}
}
public Optional<T> max(Comparator<? super T> comparator) {
try (final Transaction tx = db.beginTx()) {
Optional<T> s = stream.max(comparator);
tx.success();
return s;
}
}
@Override
public long count() {
try (final Transaction tx = db.beginTx()) {
long s = stream.count();
tx.success();
return s;
}
}
public boolean anyMatch(Predicate<? super T> predicate) {
try (final Transaction tx = db.beginTx()) {
boolean s = stream.anyMatch(predicate);
tx.success();
return s;
}
}
public boolean allMatch(Predicate<? super T> predicate) {
try (final Transaction tx = db.beginTx()) {
boolean s = stream.allMatch(predicate);
tx.success();
return s;
}
}
public boolean noneMatch(Predicate<? super T> predicate) {
try (final Transaction tx = db.beginTx()) {
boolean s = stream.noneMatch(predicate);
tx.success();
return s;
}
}
@Override
public Optional<T> findFirst() {
try (final Transaction tx = db.beginTx()) {
Optional<T> s = stream.findFirst();
tx.success();
return s;
}
}
@Override
public Optional<T> findAny() {
try (final Transaction tx = db.beginTx()) {
Optional<T> s = stream.findAny();
tx.success();
return s;
}
}
@Override
public Iterator<T> iterator() {
try (final Transaction tx = db.beginTx()) {
Iterator<T> s = stream.iterator();
tx.success();
return s;
}
}
@Override
public Spliterator<T> spliterator() {
try (final Transaction tx = db.beginTx()) {
Spliterator<T> s = stream.spliterator();
tx.success();
return s;
}
}
@Override
public boolean isParallel() {
try (final Transaction tx = db.beginTx()) {
boolean s = stream.isParallel();
tx.success();
return s;
}
}
@Override
public TxEnclosingStream<T> sequential() {
try (final Transaction tx = db.beginTx()) {
final TxEnclosingStream<T> s = newLikeThis(stream.sequential());
tx.success();
return s;
}
}
@Override
public TxEnclosingStream<T> parallel() {
try (final Transaction tx = db.beginTx()) {
final TxEnclosingStream<T> s = newLikeThis(stream.parallel());
tx.success();
return s;
}
}
@Override
public TxEnclosingStream<T> unordered() {
try (final Transaction tx = db.beginTx()) {
final TxEnclosingStream<T> s = newLikeThis(stream.unordered());
tx.success();
return s;
}
}
@Override
public TxEnclosingStream<T> onClose(Runnable closeHandler) {
try (final Transaction tx = db.beginTx()) {
final TxEnclosingStream<T> s = newLikeThis(stream.onClose(closeHandler));
tx.success();
return s;
}
}
@Override
public void close() {
stream.close(); }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment