Skip to content

Instantly share code, notes, and snippets.

@igouss
Created September 22, 2015 22:12
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save igouss/2670d6634d0d84470bf5 to your computer and use it in GitHub Desktop.
Save igouss/2670d6634d0d84470bf5 to your computer and use it in GitHub Desktop.
RxJava and Hibernate Scrollable query Raw
package com.naxsoft.database;
import org.hibernate.Query;
import org.hibernate.ScrollMode;
import org.hibernate.ScrollableResults;
import org.hibernate.StatelessSession;
import rx.Observable;
/**
* Copyright NAXSoft 2015
*/
public class ObservableQuery<T> {
private static final int BATCH_SIZE = 20;
private Database database;
public ObservableQuery(Database database) {
this.database = database;
}
public Observable<T> execute(String queryString) {
return Observable.defer(() -> Observable.using(this::getStatelessSession,
session -> executeQuery(queryString, session),
StatelessSession::close));
}
private StatelessSession getStatelessSession() {
return database.getSessionFactory().openStatelessSession();
}
private Observable<T> executeQuery(String queryString, StatelessSession session) {
return Observable.using(() -> getScrollableResults(queryString, session),
scrollableResults -> scrollResults(scrollableResults),
scrollableResults -> scrollableResults.close());
}
private ScrollableResults getScrollableResults(String queryString, StatelessSession session) {
Query query = session.createQuery(queryString);
query.setCacheable(false);
query.setReadOnly(true);
query.setFetchSize(BATCH_SIZE);
return query.scroll(ScrollMode.FORWARD_ONLY);
}
private Observable<T> scrollResults(ScrollableResults result) {
return Observable.<T>create(subscriber -> {
while (!subscriber.isUnsubscribed() && result.next()) {
subscriber.onNext((T) result.get(0));
}
if (!subscriber.isUnsubscribed()) {
subscriber.onCompleted();
}
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment