Last active
May 26, 2016 10:33
-
-
Save Tolsi/4ed32dd0f47efdd07eecb5645e657061 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package scalikejdbc | |
import java.sql.{Connection, ResultSet, SQLException} | |
import java.util.{Timer, TimerTask} | |
import scala.collection._ | |
import scala.util.Try | |
object SQLToIterator { | |
def apply[A, E <: WithExtractor](sql: SQL[A, HasExtractor])(implicit connectionPool: ConnectionPool): SQLToIterator[A, E] = { | |
new SQLToIteratorImpl[A, E](sql.statement, sql.rawParameters, connectionPool)(sql.extractor) | |
.fetchSize(sql.fetchSize).tags(sql.tags: _*).queryTimeout(sql.queryTimeout) | |
} | |
} | |
case object EmptySession extends DBSession { | |
override val isReadOnly: Boolean = true | |
override private[scalikejdbc] val connectionAttributes: DBConnectionAttributes = null | |
override private[scalikejdbc] val conn: Connection = null | |
} | |
trait SQLToIterator[A, E <: WithExtractor] extends SQLToResult[A, E, Iterator] { | |
private[scalikejdbc] def connectionPool: ConnectionPool | |
def result[AA](f: WrappedResultSet => AA, session: DBSession): Iterator[AA] = { | |
val newConnection = connectionPool.borrow() | |
val newSession = new ActiveSession(newConnection, session.connectionAttributes, None, isReadOnly = true) | |
val execution = newSession.toStatementExecutor(statement, rawParameters) | |
val rs = execution.executeQuery() | |
new ResultSetIterator(newSession, rs, queryTimeout).map(f) | |
} | |
} | |
class SQLToIteratorImpl[A, E <: WithExtractor]( | |
override val statement: String, private[scalikejdbc] override val rawParameters: Seq[Any], | |
private[scalikejdbc] override val connectionPool: ConnectionPool)( | |
override val extractor: WrappedResultSet => A | |
) | |
extends SQL[A, E](statement, rawParameters)(extractor) | |
with SQLToIterator[A, E] { | |
override protected def withParameters(params: Seq[Any]): SQLToResult[A, E, Iterator] = { | |
new SQLToIteratorImpl[A, E](statement, params, connectionPool)(extractor) | |
} | |
override protected def withStatementAndParameters(state: String, params: Seq[Any]): SQLToResult[A, E, Iterator] = { | |
new SQLToIteratorImpl[A, E](state, params, connectionPool)(extractor) | |
} | |
override protected def withExtractor[B](f: WrappedResultSet => B): SQLToResult[B, HasExtractor, Iterator] = { | |
new SQLToIteratorImpl[B, HasExtractor](statement, rawParameters, connectionPool)(f) | |
} | |
} | |
object ResultSetIterator { | |
private[scalikejdbc] lazy val timeoutTimer = new Timer(s"scalikejdbc-iterator-timeout-timer", true) | |
} | |
class ResultSetIterator(session: DBSession, rs: ResultSet, timeoutSeconds: Option[Int]) extends Iterator[WrappedResultSet] { | |
import ResultSetIterator._ | |
@volatile | |
private[this] var lastPosition = 0 | |
@volatile | |
private[this] var nextFlag: Boolean = true | |
@volatile | |
private[this] var closeTimerTask: Option[TimerTask] = timeoutSeconds.map { timeoutSeconds => | |
val tt = new CloseTimerTask | |
timeoutTimer.schedule(tt, timeoutSeconds * 1000) | |
tt | |
} | |
override def hasNext: Boolean = nextFlag | |
private def updateNextFlag() = nextFlag = rs.next() | |
{ | |
updateNextFlag() | |
} | |
override def next(): WrappedResultSet = { | |
lastPosition += 1 | |
val cursor = new ResultSetCursor(lastPosition) | |
val r = new WrappedResultSet(rs, cursor, cursor.position) | |
updateNextFlag() | |
if (!hasNext) { | |
close() | |
} | |
r | |
} | |
def close() = { | |
closeTimerTask.foreach(_.cancel()) | |
if (!rs.isClosed) { | |
Try { | |
rs.close() | |
} | |
Try { | |
session.close() | |
} | |
} | |
} | |
private class CloseTimerTask extends TimerTask { | |
override def run(): Unit = close() | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class MyRepository(db: DBConnection) { | |
def get: Future[Iterator[String]] = { | |
Future { | |
implicit val session = EmptySession | |
SQLToIterator.toIterator(sql"""SELECT big_strings from db""".fetchSize(50).queryTimeout(10).map(_.string(1))).apply() | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment