Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save malcolmgreaves/3e47b466dcbfa67a654a to your computer and use it in GitHub Desktop.
Save malcolmgreaves/3e47b466dcbfa67a654a to your computer and use it in GitHub Desktop.
import scala.concurrent._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.Random
import java.util.{Timer, TimerTask}
object Util {
sealed trait BaseResponse
case class Response1(res: Int) extends BaseResponse
case class Response2(res: String) extends BaseResponse
private val timeoutTimer = {
//in real life, use an Akka scheduler or a Netty HashWheelTimer, because they are more thread-efficient
new Timer()
}
def timeoutFuture[T](timeout: Duration): Future[T] = {
val prom = Promise[T]
val timeoutTask = new TimerTask {
override def run = {
prom.failure(new TimeoutException(s"timed out after $timeout"))
}
}
timeoutTimer.schedule(timeoutTask, timeout.toMillis)
prom.future
}
private val random = new Random(System.currentTimeMillis())
private val possibleSleepMS = List(100L, 200L, 150L, 250L)
//creates a future that sleeps for a pseudo-random amount of time, and then executes t.
//both actions happen on a dedicated thread (because we imported scala.concurrent.ExecutionContext.Implicits.global)
def sleepAndThen[T](t: => T): Future[T] = {
Future {
val sleepLengthIdx = math.abs(random.nextInt) % possibleSleepMS.length
val sleepLength = possibleSleepMS(sleepLengthIdx)
Thread.sleep(sleepLength)
t
}
}
}
//fan out/fan in pattern:
//execute a bunch of independent background jobs, each in their own future.
//merge them into one future that will be complete when the slowest one is.
def fanOut = {
import Util._
val f1 = sleepAndThen(Response1(1))
val f2 = sleepAndThen(Response2("one"))
val f3 = sleepAndThen(Response1(123))
val fannedOutFutures: List[Future[BaseResponse]] = f1 :: f2 :: f3 :: Nil
val results: Future[List[BaseResponse]] = Future.sequence(fannedOutFutures)
results.map { resultList: List[BaseResponse] =>
//after all the results come back, we can do stuff with them
}
}
//timeout pattern
//execute a first request and try again if it fails
def timeout = {
import Util._
val request = sleepAndThen(Response1(1))
val timeout = timeoutFuture[Response1](1.second)
val respFuture = Future.firstCompletedOf(request :: timeout :: Nil)
respFuture.map { resp =>
//only executes if the request didn't time out
}
}
//similar to timeouts - cancelling
//if you have an expensive operation, or maybe even one that runs forever,
//you can cancel it so it doesn't take up resources after we don't need it
def cancel = {
import Util._
//the variable we set to start the cancellation process
val cancelVar = Promise[Unit]()
//the variable that the cancelee sets to ack that it's fully done
val cancelledVar = Promise[Unit]()
//the handler for each request in our simple server
def acceptRequest = ()
//start the server
Future {
while(!cancelVar.isCompleted) {
acceptRequest
}
cancelledVar.success(())
}
//stop the server after a while
sleepAndThen {
cancelVar.success(())
while(!cancelledVar.isCompleted) {
println("still waiting for the server to finish")
}
}
}
//firstCompleted
//execute the same request on multiple servers, and return the first that comes back.
//combined with timeouts and cancellation you can tightly control your call
def firstCompleted = {
import Util._
def makeRequest = sleepAndThen(Response1(1))
val req1 = makeRequest
val req2 = makeRequest
val req3 = makeRequest
val respFuture: Future[Response1] = Future.firstCompletedOf(req1 :: req2 :: req3 :: Nil)
respFuture.map { resp =>
//continue with the response from the fastest server
}
}
//channels
//allow *asynchronous* bidirectional communication between two concurrently executing processes.
//scala.concurrency has a synchronous version of this
def channels = {
import Util._
//a class that lets you send one message. you can compose these into a queue to send multiple messages
class OneTimeChannel[T] {
private val sendProm = Promise[T]()
private val recvProm = Promise[Unit]()
def send(t: T): Future[Unit] = {
sendProm.success(t)
recvProm.future
}
def recv: Future[T] = {
sendProm.future.map { t =>
recvProm.success(())
t
}
}
}
val chan = new OneTimeChannel[Int]
//sender
sleepAndThen {
val sent: Future[Unit] = chan.send(123)
sent.map { _ =>
//do stuff after we know the value has been sent
}
}
//receiver
sleepAndThen {
val receivedFuture: Future[Int] = chan.recv
receivedFuture.map { received =>
//do stuff with the received value
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment