Skip to content

Instantly share code, notes, and snippets.

@binshuohu
Last active December 8, 2015 13:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save binshuohu/6b6454d81cf5726a36d8 to your computer and use it in GitHub Desktop.
Save binshuohu/6b6454d81cf5726a36d8 to your computer and use it in GitHub Desktop.
separate computation from io
package com.vpon.slickit
import java.util.concurrent.Executors
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, Future}
sealed trait Task {
def execute(parameter: String): String
}
class ShortTask extends Task {
def execute(parameter: String): String = "done"
}
class LongTask extends Task {
def execute(parameter: String): String = {
(1 to 1000).foldLeft(1.0) { (a, x) =>
(1 to x).foldLeft(a) { (b, y) => b + y * 23.5 }
}.toString
}
}
object Main {
def main(args: Array[String]): Unit = {
val tasks = (1 to 1000) map { p =>
if (p % 2 == 0) new ShortTask else new LongTask
}
//Basically, LongTask stands for a blocking operation.
//In a reactive system, we don't want those time consuming operations like IO
//to slow down the whole system, so it's really essential to isolate IO operations
//and execute them in a dedicated thread pool in order to make the main logic of the system asynchronous all the way down.
val ioService = Executors.newFixedThreadPool(4)
val computationService = Executors.newFixedThreadPool(4)
val ioThreadPool = ExecutionContext.fromExecutor(ioService)
val computationThreadPool = ExecutionContext.fromExecutor(computationService)
def runIO(task: Task): Future[Unit] = {
implicit val threadPool = ioThreadPool
Future {
task.execute("Buzz Lightyear")
} map {
case result => println(result)
}
}
def runComputation(task: Task): Future[Unit] = {
implicit val threadPool = computationThreadPool
Future {
task.execute("Puss in Boots")
} map {
case result => println(result)
}
}
val taskFutures = tasks map { task =>
if(task.isInstanceOf[ShortTask])
runIO(task)
else
runComputation(task)
}
import ExecutionContext.Implicits.global
Future.sequence(taskFutures) onComplete {
case _ =>
ioService.shutdown()
computationService.shutdown()
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment