Skip to content

Instantly share code, notes, and snippets.

@justinhj
Last active October 12, 2016 21:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save justinhj/d2ee95f565e179e62d9510171cb0c7a0 to your computer and use it in GitHub Desktop.
Save justinhj/d2ee95f565e179e62d9510171cb0c7a0 to your computer and use it in GitHub Desktop.
Demo of concurrency handling in for comprehension using Futures
// Demonstrating futures running concurrently and in sequence
import scala.concurrent.{ExecutionContext, Future}
import scala.language.postfixOps
// A simple execution context for us to use that will not shut down the JVM until the threads we start have all finished
// It does that by making the threads non-Daemon
// http://stackoverflow.com/questions/25236143/why-future-example-do-not-work
// http://stackoverflow.com/questions/2213340/what-is-daemon-thread-in-java
class NonDaemonThreadExecutionContext extends ExecutionContext {
override def execute(runnable:Runnable) = {
val t = new Thread(runnable)
t.setDaemon(false)
t.start()
}
override def reportFailure(t:Throwable) = t.printStackTrace
}
object futureTest {
// a simple function that creates a future that will run for two seconds
def twoSecondTask(name: String) = Future[Int] {
Thread.sleep(2000)
println(s"two seconds $name")
2
}
// same but for five seconds
def fiveSecondTask(name: String) = Future[Int] {
Thread.sleep(5000)
println(s"five seconds $name")
5
}
implicit val ec = new NonDaemonThreadExecutionContext()
def main(args: Array[String]): Unit = {
val startTime = System.currentTimeMillis()
// takes 7 seconds
val results = for(
a <- twoSecondTask("insidefor");
b <- fiveSecondTask("insidefor")
) yield (a,b)
// desugared to this
// observe that twoSecondTask is created right away whilst fiveSecondTask won't be created until
// a successfully completes
// val results = twoSecondTask.flatMap(((a) => fiveSecondTask.map(((b) => scala.Tuple2(a, b)))));
// display the final result
results foreach {
thing =>
val time = System.currentTimeMillis()
println(s"futures in for comprehension $thing at t=${time - startTime}")
}
// takes 5 seconds if we create the futures before the for comprehension
// since they both start together outside of the for comprehension
val f1 = twoSecondTask("outsidefor")
val f2 = fiveSecondTask("outsidefor")
val preStartResult = for (
a <- f1;
b <- f2
) yield (a,b)
// desugared to
// val preStartResult = f1.flatMap(((a) => f2.map(((b) => scala.Tuple2(a, b)))));
preStartResult foreach {
things =>
val time3 = System.currentTimeMillis()
println(s"futures started outside of for comprehension ${things} at t=${time3 - startTime}")
}
// takes 5 seconds if compose the futures as a sequence and run them together
// but what if we don't want them to start before the for comprehension because of dependencies?
// we can compose them and run concurrently inside the for loop ...
// let's pretend this dependency is some external process we need to run
// and we don't want to run the other futures unless it succeeds
val r = scala.util.Random
val dependency = Future[Boolean] {
val dependencySucceeded = r.nextInt(100) > 90
println(s"dependency succeeded = $dependencySucceeded")
dependencySucceeded
}
val dependentConcurrency = for(
x <- dependency if (x == true);
bothDependent <- Future.sequence(List(twoSecondTask("dependency"), fiveSecondTask("dependency")))
) yield bothDependent
// val dependentConcurrency = dependency.withFilter(((x) => x.$eq$eq(true))).flatMap(((x)
// => Future.sequence(List(twoSecondTask("dependency"), fiveSecondTask("dependency"))).map(((bothDependent) => bothDependent))));
dependentConcurrency foreach {
things =>
val time = System.currentTimeMillis()
println(s"futures dependent on another in sequence ${things} at t=${time - startTime}")
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment