Skip to content

Instantly share code, notes, and snippets.

@kermitas
Last active February 16, 2016 16:18
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kermitas/a19a89003a50a1812e4f to your computer and use it in GitHub Desktop.
Save kermitas/a19a89003a50a1812e4f to your computer and use it in GitHub Desktop.
Short lecture about Future(s) at #scala45pl, 2015-10-24, http://www.meetup.com/WarszawScaLa/events/225320171/ .
package scala.concurrent
import duration._
import scala.util.{ Success, Try }
import org.scalatest.FeatureSpec
/**
* #scala45pl http://www.meetup.com/WarszawScaLa/events/225320171/
*
* Also during this presentation:
* - https://gist.github.com/kermitas/41c456c839645ab300d3
* - https://gist.github.com/kermitas/0edade6d73e02dd70ea2
* - https://gist.github.com/kermitas/c102d361204978ea66a1
*/
class Scala45PlFutureTest extends FeatureSpec {
scenario("#scala45pl: Future initialization (explicit ExecutionContext)") {
val ec: ExecutionContext = scala.concurrent.ExecutionContext.Implicits.global
val f = Future {
Thread.sleep(1000)
val result = "Done"
System.out.println("From Future: " + result)
result
}(ec)
f.onComplete { result: Try[String] =>
System.out.println(result)
}(ec)
/**
* Will produce on console:
*
* From Future: Done
* Success(Done)
*/
}
scenario("#scala45pl: Future initialization (implicit ExecutionContext)") {
implicit val ec: ExecutionContext = scala.concurrent.ExecutionContext.Implicits.global
val f = Future {
Thread.sleep(1000)
val result = "Done"
System.out.println("From Future: " + result)
result
}
f.onComplete { result: Try[String] =>
System.out.println(result)
}
/**
* Will produce on console:
*
* From Future: Done
* Success(Done)
*/
}
scenario("#scala45pl: creating Promise, taking Future, register to future.onComplete()") {
implicit val ec: ExecutionContext = scala.concurrent.ExecutionContext.Implicits.global
// that is the Promise we will complete once computation result is ready
val promise: Promise[String] = Promise()
// taking Future that we can spread across the application
val future: Future[String] = promise.future
// look out: future.onComplete() has side effect (it will 'remember all' registered functions)
future.onComplete { result: Try[String] =>
System.out.println("result 1 start = " + result)
Thread.sleep(1000)
System.out.println("result 1 finish")
}
// look out: again, side effect
future.onComplete { result: Try[String] =>
System.out.println("result 2 start = " + result)
Thread.sleep(500)
System.out.println("result 2 finish")
}
System.out.println("completing the Promise")
promise.complete(Success("scala45pl")) // <------ completing the Promise (and thus that all functions registered to onComplete() will be executed with threads taken from thread pool)
// look out: side effect one more time,
// but wait! at this moment we are sure for 100% that Future (the Promise to be precise) is completed,
// will registered function below be executed?
future.onComplete { result: Try[String] =>
System.out.println("result 3 start = " + result)
Thread.sleep(200)
System.out.println("result 3 finish")
}
Await.result(future, 5 seconds)
System.out.println("unit test finish")
Thread.sleep(1500) // <-- block this test for a little till all logs will be printed on console
/**
* Will produce on console:
*
* completing the Promise
* result 1 start = Success(scala45pl)
* result 2 start = Success(scala45pl)
* result 3 start = Success(scala45pl)
* unit test finish
* result 3 finish
* result 2 finish
* result 1 finish
*/
}
def createSimpleFuture(sleepTime: Long)(implicit ec: ExecutionContext): Future[String] = Future {
Thread.sleep(sleepTime)
"Done " + sleepTime
}
scenario("#scala45pl: serial initialization of three Future(s)") {
implicit val ec: ExecutionContext = scala.concurrent.ExecutionContext.Implicits.global
val startTime = System.currentTimeMillis
// serial initialization leads to starting following Future only when previous one was completed
for {
result1: String <- createSimpleFuture(300)
result2: String <- createSimpleFuture(100)
result3: String <- createSimpleFuture(200)
} {
System.out.println("execution time = " + (System.currentTimeMillis - startTime) + "ms")
}
Thread.sleep(1500) // <-- block this test for a little till all logs will be printed on console
/**
* Will produce on console:
*
* result1=Done 300, result2=Done 100, result3=Done 200
* execution time = 609ms
*/
}
scenario("#scala45pl: parallel initialization of three Future(s)") {
implicit val ec: ExecutionContext = scala.concurrent.ExecutionContext.Implicits.global
val startTime = System.currentTimeMillis
// parallel initialization
val future1: Future[String] = createSimpleFuture(300)
val future2: Future[String] = createSimpleFuture(100)
val future3: Future[String] = createSimpleFuture(200)
for {
result1 <- future1
result2 <- future2
result3 <- future3
} {
System.out.println(s"result1=$result1, result2=$result2, result3=$result3")
System.out.println("execution time = " + (System.currentTimeMillis - startTime) + "ms")
}
Thread.sleep(1500) // <-- block this test for a little till all logs will be printed on console
/**
* Will produce on console:
*
* result1=Done 300, result2=Done 100, result3=Done 200
* execution time = 301ms
*/
}
scenario("#scala45pl: Future.sequence()") {
implicit val ec: ExecutionContext = scala.concurrent.ExecutionContext.Implicits.global
val startTime = System.currentTimeMillis
val future1: Future[String] = createSimpleFuture(300)
val future2: Future[String] = createSimpleFuture(100)
val future3: Future[String] = createSimpleFuture(200)
val seqOfFutures: Seq[Future[String]] = Seq(future1, future2, future3)
// How to change Seq[Future[String]] into Future[Seq[String]] ?
val result: Future[Seq[String]] = Future.sequence(seqOfFutures)
result.onComplete { result: Try[Seq[String]] =>
System.out.println(s"result=$result")
System.out.println("execution time = " + (System.currentTimeMillis - startTime) + "ms")
}
Thread.sleep(1500) // <-- block this test for a little till all logs will be printed on console
/**
* Will produce on console:
*
* result=Success(List(Done 300, Done 100, Done 200))
* execution time = 305ms
*/
}
trait Model
trait Id[M <: Model]
scenario("#scala45pl: method declaration - wrong") {
trait Dao[M <: Model] {
def exists(id: Id[M]): Future[Boolean]
}
// simple question: how pass dedicated execution context dao.exists()?
// (for example: calling-thread-ExecutionContext in unit test)
}
scenario("#scala45pl: method declaration - good (we can pass any ExecutionContext to exists() method)") {
trait Dao[M <: Model] {
def exists(id: Id[M])(implicit ec: ExecutionContext): Future[Boolean]
}
// not we can pass any ExecutionContext we would like to!
}
trait Dao[M <: Model] {
def exists(id: Id[M])(implicit ec: ExecutionContext): Future[Boolean]
}
class User extends Model
scenario("#scala45pl: method body - wrong") {
trait UserDao extends Dao[User] {
override def exists(id: Id[User])(implicit ec: ExecutionContext): Future[Boolean] = {
val id = 77 / 0 // do some (potentially danger) operation here
val result: Boolean = {
// execute real database query here; let's say that id - passed to this method - was found in some table
true
}
// do some (potentially danger) operation here
Future.successful(result)
}
}
}
scenario("#scala45pl: method body - better (whole method body is covered by Future)") {
trait UserDao extends Dao[User] {
override def exists(id: Id[User])(implicit ec: ExecutionContext): Future[Boolean] = Future {
val id = 77 / 0 // do some (potentially danger) operation here
val result: Boolean = {
// execute real database query here; let's say that id - passed to this method - was found in some table
true
}
result
}
}
}
scenario("#scala45pl: some methods that does not take ExecutionContext") {
// Look out, some methods that we like and use from time to time ... does not allow us to pass ExecutionContext:
// - scala.concurrent.Future.mapTo
// http://www.scala-lang.org/api/current/index.html#scala.concurrent.Future@mapTo[S](implicittag:scala.reflect.ClassTag[S]):scala.concurrent.Future[S]
// - akka.pattern.ask
// http://doc.akka.io/api/akka/2.4.0/?_ga=1.236123777.1217029815.1422907272#akka.pattern.AskSupport
// That leads to potentially danger operation of spawning tones of Future(s):
// please discover: no way to pass ExecutionContext
def methodThatWillUseItsOwnExecutionContext(initialValue: Int): Future[Int] = {
// method will use some ExecutionContext, definitely other than we
implicit val ec: ExecutionContext = scala.concurrent.ExecutionContext.Implicits.global
Future {
initialValue
}.map { i: Int =>
i * 2
}.map { i: Int =>
i * 2
}
}
def test(initialValue:Int)(implicit ec: ExecutionContext): Future[Int] =
Future {
System.out.println("test(): executed with " + initialValue)
2
}.map { i: Int =>
i + 2
}.flatMap { i: Int =>
methodThatWillUseItsOwnExecutionContext(i)
}.map { i: Int =>
i + 2
}.map { i: Int =>
Thread.sleep(100)
System.out.println("test(): result = " + i)
i
}
implicit val callingThreadExecutionContext = new CallingThreadExecutionContext
test(1)
test(1)
test(1)
test(1)
Thread.sleep(1500) // <-- block this test for a little till all logs will be printed on console
/**
* Will produce on console (please see that we spawned 4 parallel Future(s) while we thought they will be processed one-by-one):
*
* test(): executed with 1
* test(): result = 18
* test(): executed with 1
* test(): executed with 1
* test(): executed with 1
* test(): result = 18
* test(): result = 18
* test(): result = 18
*
* ... BUT if methodThatWillUseItsOwnExecutionContext() would allow us to pass our own ExecutionContext then it would be:
*
* test(): executed with 1
* test(): result = 18
* test(): executed with 1
* test(): result = 18
* test(): executed with 1
* test(): result = 18
* test(): executed with 1
* test(): result = 18
*/
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment