Skip to content

Instantly share code, notes, and snippets.

@Tolsi
Last active May 26, 2016 10:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Tolsi/4ed32dd0f47efdd07eecb5645e657061 to your computer and use it in GitHub Desktop.
Save Tolsi/4ed32dd0f47efdd07eecb5645e657061 to your computer and use it in GitHub Desktop.
package scalikejdbc
import java.sql.{Connection, ResultSet, SQLException}
import java.util.{Timer, TimerTask}
import scala.collection._
import scala.util.Try
object SQLToIterator {
def apply[A, E <: WithExtractor](sql: SQL[A, HasExtractor])(implicit connectionPool: ConnectionPool): SQLToIterator[A, E] = {
new SQLToIteratorImpl[A, E](sql.statement, sql.rawParameters, connectionPool)(sql.extractor)
.fetchSize(sql.fetchSize).tags(sql.tags: _*).queryTimeout(sql.queryTimeout)
}
}
case object EmptySession extends DBSession {
override val isReadOnly: Boolean = true
override private[scalikejdbc] val connectionAttributes: DBConnectionAttributes = null
override private[scalikejdbc] val conn: Connection = null
}
trait SQLToIterator[A, E <: WithExtractor] extends SQLToResult[A, E, Iterator] {
private[scalikejdbc] def connectionPool: ConnectionPool
def result[AA](f: WrappedResultSet => AA, session: DBSession): Iterator[AA] = {
val newConnection = connectionPool.borrow()
val newSession = new ActiveSession(newConnection, session.connectionAttributes, None, isReadOnly = true)
val execution = newSession.toStatementExecutor(statement, rawParameters)
val rs = execution.executeQuery()
new ResultSetIterator(newSession, rs, queryTimeout).map(f)
}
}
class SQLToIteratorImpl[A, E <: WithExtractor](
override val statement: String, private[scalikejdbc] override val rawParameters: Seq[Any],
private[scalikejdbc] override val connectionPool: ConnectionPool)(
override val extractor: WrappedResultSet => A
)
extends SQL[A, E](statement, rawParameters)(extractor)
with SQLToIterator[A, E] {
override protected def withParameters(params: Seq[Any]): SQLToResult[A, E, Iterator] = {
new SQLToIteratorImpl[A, E](statement, params, connectionPool)(extractor)
}
override protected def withStatementAndParameters(state: String, params: Seq[Any]): SQLToResult[A, E, Iterator] = {
new SQLToIteratorImpl[A, E](state, params, connectionPool)(extractor)
}
override protected def withExtractor[B](f: WrappedResultSet => B): SQLToResult[B, HasExtractor, Iterator] = {
new SQLToIteratorImpl[B, HasExtractor](statement, rawParameters, connectionPool)(f)
}
}
object ResultSetIterator {
private[scalikejdbc] lazy val timeoutTimer = new Timer(s"scalikejdbc-iterator-timeout-timer", true)
}
class ResultSetIterator(session: DBSession, rs: ResultSet, timeoutSeconds: Option[Int]) extends Iterator[WrappedResultSet] {
import ResultSetIterator._
@volatile
private[this] var lastPosition = 0
@volatile
private[this] var nextFlag: Boolean = true
@volatile
private[this] var closeTimerTask: Option[TimerTask] = timeoutSeconds.map { timeoutSeconds =>
val tt = new CloseTimerTask
timeoutTimer.schedule(tt, timeoutSeconds * 1000)
tt
}
override def hasNext: Boolean = nextFlag
private def updateNextFlag() = nextFlag = rs.next()
{
updateNextFlag()
}
override def next(): WrappedResultSet = {
lastPosition += 1
val cursor = new ResultSetCursor(lastPosition)
val r = new WrappedResultSet(rs, cursor, cursor.position)
updateNextFlag()
if (!hasNext) {
close()
}
r
}
def close() = {
closeTimerTask.foreach(_.cancel())
if (!rs.isClosed) {
Try {
rs.close()
}
Try {
session.close()
}
}
}
private class CloseTimerTask extends TimerTask {
override def run(): Unit = close()
}
}
class MyRepository(db: DBConnection) {
def get: Future[Iterator[String]] = {
Future {
implicit val session = EmptySession
SQLToIterator.toIterator(sql"""SELECT big_strings from db""".fetchSize(50).queryTimeout(10).map(_.string(1))).apply()
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment