Last active
October 12, 2016 21:25
-
-
Save justinhj/d2ee95f565e179e62d9510171cb0c7a0 to your computer and use it in GitHub Desktop.
Demo of concurrency handling in for comprehension using Futures
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Demonstrating futures running concurrently and in sequence | |
import scala.concurrent.{ExecutionContext, Future} | |
import scala.language.postfixOps | |
// A simple execution context for us to use that will not shut down the JVM until the threads we start have all finished | |
// It does that by making the threads non-Daemon | |
// http://stackoverflow.com/questions/25236143/why-future-example-do-not-work | |
// http://stackoverflow.com/questions/2213340/what-is-daemon-thread-in-java | |
class NonDaemonThreadExecutionContext extends ExecutionContext { | |
override def execute(runnable:Runnable) = { | |
val t = new Thread(runnable) | |
t.setDaemon(false) | |
t.start() | |
} | |
override def reportFailure(t:Throwable) = t.printStackTrace | |
} | |
object futureTest { | |
// a simple function that creates a future that will run for two seconds | |
def twoSecondTask(name: String) = Future[Int] { | |
Thread.sleep(2000) | |
println(s"two seconds $name") | |
2 | |
} | |
// same but for five seconds | |
def fiveSecondTask(name: String) = Future[Int] { | |
Thread.sleep(5000) | |
println(s"five seconds $name") | |
5 | |
} | |
implicit val ec = new NonDaemonThreadExecutionContext() | |
def main(args: Array[String]): Unit = { | |
val startTime = System.currentTimeMillis() | |
// takes 7 seconds | |
val results = for( | |
a <- twoSecondTask("insidefor"); | |
b <- fiveSecondTask("insidefor") | |
) yield (a,b) | |
// desugared to this | |
// observe that twoSecondTask is created right away whilst fiveSecondTask won't be created until | |
// a successfully completes | |
// val results = twoSecondTask.flatMap(((a) => fiveSecondTask.map(((b) => scala.Tuple2(a, b))))); | |
// display the final result | |
results foreach { | |
thing => | |
val time = System.currentTimeMillis() | |
println(s"futures in for comprehension $thing at t=${time - startTime}") | |
} | |
// takes 5 seconds if we create the futures before the for comprehension | |
// since they both start together outside of the for comprehension | |
val f1 = twoSecondTask("outsidefor") | |
val f2 = fiveSecondTask("outsidefor") | |
val preStartResult = for ( | |
a <- f1; | |
b <- f2 | |
) yield (a,b) | |
// desugared to | |
// val preStartResult = f1.flatMap(((a) => f2.map(((b) => scala.Tuple2(a, b))))); | |
preStartResult foreach { | |
things => | |
val time3 = System.currentTimeMillis() | |
println(s"futures started outside of for comprehension ${things} at t=${time3 - startTime}") | |
} | |
// takes 5 seconds if compose the futures as a sequence and run them together | |
// but what if we don't want them to start before the for comprehension because of dependencies? | |
// we can compose them and run concurrently inside the for loop ... | |
// let's pretend this dependency is some external process we need to run | |
// and we don't want to run the other futures unless it succeeds | |
val r = scala.util.Random | |
val dependency = Future[Boolean] { | |
val dependencySucceeded = r.nextInt(100) > 90 | |
println(s"dependency succeeded = $dependencySucceeded") | |
dependencySucceeded | |
} | |
val dependentConcurrency = for( | |
x <- dependency if (x == true); | |
bothDependent <- Future.sequence(List(twoSecondTask("dependency"), fiveSecondTask("dependency"))) | |
) yield bothDependent | |
// val dependentConcurrency = dependency.withFilter(((x) => x.$eq$eq(true))).flatMap(((x) | |
// => Future.sequence(List(twoSecondTask("dependency"), fiveSecondTask("dependency"))).map(((bothDependent) => bothDependent)))); | |
dependentConcurrency foreach { | |
things => | |
val time = System.currentTimeMillis() | |
println(s"futures dependent on another in sequence ${things} at t=${time - startTime}") | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment