Created
January 27, 2017 10:39
-
-
Save gbersac/c4d3a859eac815f1600144fe1ffea2a9 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
case class Query[D <: Database, +A](db: D, ec: ExecutionContext)(private val atomic: Connection => A) extends Logger { | |
def map[B](f: A => B): Query[D, B] = | |
Query(db, ec)(f compose atomic) | |
def flatMap[B](f: A => Query[D, B]): Query[D, B] = | |
Query(db, ec)(connection => f(atomic(connection)).atomic(connection)) | |
def zip[B](query: Query[D, B]): Query[D, (A, B)] = | |
flatMap { a => | |
query.map { b => | |
(a, b) | |
} | |
} | |
def commit(logException: Boolean = true): Future[A] = { | |
val result = Future { | |
db.getDB.withConnection(atomic) | |
}(ec) | |
result.onFailure { | |
case ex: Exception if (logException) => LOG.error("Connection failed", ex) | |
}(ec) | |
result | |
} | |
/** | |
* Use commitTransaction instead of commit if you have dependant update queries | |
*/ | |
def commitTransaction(logException: Boolean = true): Future[A] = { | |
val result = Future { | |
db.getDB.withTransaction(atomic) | |
}(ec) | |
result.onFailure { | |
case ex: Exception if (logException) => LOG.error("Transaction failed", ex) | |
}(ec) | |
result | |
} | |
} | |
case class QueryBuilder[D <: Database](db: D, actorSystem: ActorSystem) { | |
// Code interfacing with the DB get its own thread pool so HTTP requests' threads do not block during DB IO. | |
lazy val ec = actorSystem.dispatchers.lookup("database.execution-context") | |
def pure[A](a: => A) = Query(db, ec)(_ => a) | |
def seq[A](maybeTx: Option[Query[D, A]]): Query[D, Option[A]] = | |
maybeTx match { | |
case Some(tx) => tx.map(a => Some(a)) | |
case None => pure(None) | |
} | |
def futureQuery[A](q: Query[D, Future[A]]): Future[Query[D, A]] = { | |
val p = Promise[A]() | |
q.map { f => | |
f.onComplete(v => p.complete(v)) | |
} | |
p.future.map(v => pure(v)) | |
} | |
def seq[A, F[X] <: TraversableOnce[X]](tas: F[Query[D, A]])(implicit cbf: CanBuildFrom[F[Query[D, A]], A, F[A]]): Query[D, F[A]] = | |
tas.foldLeft(pure(cbf(tas))) { (tr, ta) => | |
for { | |
r <- tr | |
a <- ta | |
} yield r += a | |
} map (_.result()) | |
def failure[T](ex: Throwable) = pure[T](throw ex) | |
def apply[A](atomic: Connection => A) = Query(db, ec)(atomic) | |
} | |
/** | |
* callDB[1, 2] return Query | |
* callES[1, 2] return Future | |
* I can't compose query and future so I end up with nested types | |
*/ | |
def myFunction[T]: Query[NonBancaireDB, Future[Query[NonBancaireDB, Future[T]]] = { | |
callDB1.map { vDB1 => | |
callES1(vDB1).map { vES1 => | |
callDB2(vES1).map { vDB2 => | |
callES1(vDB2) | |
} | |
} | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment