Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
public <T> T streamQuery(String sql, Function<Stream<SqlRowSet>, ? extends T> streamer, Object... args) {
return jdbcTemplate.query(sql, resultSet -> {
final SqlRowSet rowSet = new ResultSetWrappingSqlRowSet(resultSet);
final boolean parallel = false;
// The ResultSet API has a slight impedance mismatch with Iterators, so this conditional
// simply returns an empty iterator if there are no results
if (!rowSet.next()) {
return streamer.apply(StreamSupport.stream(Spliterators.emptySpliterator(), parallel));
}
Spliterator<SqlRowSet> spliterator = Spliterators.spliteratorUnknownSize(new Iterator<SqlRowSet>() {
private boolean first = true;
@Override
public boolean hasNext() {
return !rowSet.isLast();
}
@Override
public SqlRowSet next() {
if (!first || !rowSet.next()) {
throw new NoSuchElementException();
}
first = false; // iterators can be unwieldy sometimes
return rowSet;
}
}, Spliterator.IMMUTABLE);
return streamer.apply(StreamSupport.stream(spliterator, parallel));
}, args);
}
@jillesvangurp

This comment has been minimized.

Copy link

jillesvangurp commented Jun 11, 2018

Thanks for putting me on the right track with this. I am using this from Kotlin and came up with a Streaming version that works fine if you use it in a transaction. Also, you have a bug in your exit condition and you are skipping the last row. I fixed this by making hasNext do most of the work here (a good idea when implementing iterators) and by streaming the converted rows and not the rows themselves. This also gets rid of the problem you have with the first row.

fun <T> queryStream(sql: String, converter: (SqlRowSet) -> T, args: Array<Any>): Stream<T> {
    val rowSet = jdbcTemplate.queryForRowSet(sql, *args);

    class RowSetIter : Iterator<T> {
        var current: T? = null
        override fun hasNext(): Boolean {
            if (current != null) {
                return true
            } else {
                if (rowSet.next()) {
                    current = converter.invoke(rowSet)
                    return true
                }
            }
            return false
        }

        override fun next(): T {
            if (hasNext()) {
                val retVal = current
                current = null
                return retVal!!
            } else {
                throw NoSuchElementException()
            }
        }
    }


    val spliterator = Spliterators.spliteratorUnknownSize(RowSetIter(), Spliterator.IMMUTABLE)
    return StreamSupport.stream(spliterator, false)
}

This is Kotlin but you should be able to convert this back to Java easily. I use JdbcTemplate together with a TransactionTemplate to make sure the connection stays open while I stream the results.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.