Skip to content

Instantly share code, notes, and snippets.

@dacr
Last active May 27, 2023 06:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dacr/4ff8b96c05ef238b24a8fdf041221ac6 to your computer and use it in GitHub Desktop.
Save dacr/4ff8b96c05ef238b24a8fdf041221ac6 to your computer and use it in GitHub Desktop.
scala asynchronous operations / published by https://github.com/dacr/code-examples-manager #37088128-23be-4c5b-a15d-ab898ea131ce/94889539efa3643d0363eb031f7b0ad5c7b1cf08
// summary : scala asynchronous operations
// keywords : scala, asynchronous, learning, futures, @testable
// publish : gist
// authors : David Crosson
// license : Apache NON-AI License Version 2.0 (https://raw.githubusercontent.com/non-ai-licenses/non-ai-licenses/main/NON-AI-APACHE2)
// id : 37088128-23be-4c5b-a15d-ab898ea131ce
// created-on : 2020-09-06T20:35:57Z
// managed-by : https://github.com/dacr/code-examples-manager
// run-with : scala-cli $file
// ---------------------
//> using scala "3.3.0"
//> using dep "org.scalatest::scalatest:3.2.16"
//> using objectWrapper
// ---------------------
import org.scalatest._, flatspec._, matchers._, OptionValues._, concurrent._
import scala.concurrent.{Future, Promise, Await}
import scala.concurrent.duration._
import scala.util.{Failure, Success}
class ScalaLanguageAsyncBasicsTest extends AsyncFlatSpec with should.Matchers with ScalaFutures {
override def suiteName: String = "Scala Language Asynchronous Basics"
implicit override def executionContext = scala.concurrent.ExecutionContext.Implicits.global
implicit val defaultPatience: PatienceConfig = {
import org.scalatest.time._
PatienceConfig(timeout = Span(100, Millis), interval = Span(5, Millis))
}
// ==========================================================================
"Promise" can "be used to provide a future result " in {
val myGiftPromise = Promise[String]()
Future(myGiftPromise.success("gift"))
myGiftPromise.future.map { gift =>
gift shouldBe "gift"
}
}
it can "fail" in {
val myGiftPromise = Promise[String]()
Future(myGiftPromise.failure(new Exception("you don't deserve it")))
info("myGiftPromise.future.failed :")
info("The returned Future will be successfully completed with the Throwable of the original Future if the original Future fails.")
info("If the original Future is successful, the returned Future is failed with a NoSuchElementException.")
myGiftPromise.future.failed.map { ex =>
ex.getMessage should include regex "deserve"
}
}
// ==========================================================================
"Future" can "be created" in {
Future(1)
.map(_ shouldBe 1)
}
// -------------------------------------------------
it can "complete" in {
val f = Future(1)
whenReady(f) { _ => // scalatest in async mode requires a future[Assertion] to be returned
f.isCompleted shouldBe true
}
}
// -------------------------------------------------
it can "complete in success" in {
val f = Future(1)
whenReady(f) { _ => // scalatest in async mode requires a future[Assertion] to be returned
f.value.filter(_.isSuccess) shouldBe defined
}
}
// -------------------------------------------------
it can "complete in failure" in {
val f = Future(throw new Exception("boom"))
f.failed.map { ex => // scalatest in async mode requires a future[Assertion] to be returned
ex.getMessage shouldBe "boom"
}
}
// -------------------------------------------------
it can "wait for completion" in {
val f = Future("hello")
Await.ready(f, 10.millis)
info("Await.ready : OF COURSE DON'T USE IT !")
Future(()).map(_ => f.isCompleted shouldBe true) // Because scalatest want a future return value... (of course)
}
// -------------------------------------------------
it can "wait for result" in {
val f = Future("hello")
val r = Await.result(f, 10.millis)
info("Await.result : OF COURSE DON'T USE IT !")
Future(()).map(_ => r shouldBe "hello") // Because scalatest want a future return value... (of course)
}
// -------------------------------------------------
it can "be badly composed" in {
def f1() = Future(1)
def f2() = Future(2)
val futureResult = for {
r1 <- f1()
r2 <- f2() // will start only once f1 has finished
} yield r1 + r2
futureResult.map(_ shouldBe 3)
}
// -------------------------------------------------
it can "be better composed" in {
val f1 = Future(1)
val f2 = Future(2)
val futureResult = for {
r1 <- f1
r2 <- f2
} yield r1 + r2
futureResult.map(_ shouldBe 3)
}
// -------------------------------------------------
it can "allow transparent operation" in {
Future("A")
.andThen { case Success(value) => value + "B" }
.map(_ shouldBe "A")
}
// -------------------------------------------------
it should "recover from failure" in {
Future(throw new Exception("BAD"))
.recover { _ => "A" }
.map(_ shouldBe "A")
}
// ==========================================================================
"Futures" can "be possible to react when all futures are completed successfully" in {
val futures: Seq[Future[Int]] = Seq(Future(1), Future(2), Future(3))
info("Take care here as all futures are 'started'")
val future: Future[Seq[Int]] = Future.sequence(futures)
future.map(_.sum shouldBe 6)
}
// -------------------------------------------------
it can "be possible to get the first completed but they are all started" in {
val futures: Seq[Future[Int]] = Seq(Future(1), Future(2), Future(3))
info("Take care here as all futures are 'started'")
val future: Future[Int] = Future.firstCompletedOf(futures)
future.map(_ should be > 0)
}
// -------------------------------------------------
it can "be possible to get the first completed event with lazy list (just a check)" in {
def slowish(delayMs: Int, value: Int): Int = {
Thread.sleep(delayMs) // Of course never use sleep :)
value
}
val futures: LazyList[Future[Int]] = {
Future(slowish(10, 1)) #::
Future(slowish(5, 2)) #::
Future(slowish(1, 3)) #::
LazyList.empty
}
val future: Future[Int] = Future.firstCompletedOf(futures)
future.map(_ shouldBe 3)
}
// -------------------------------------------------
it should "execute sequentially what ever their result states" in {
val p0 = Promise[String]()
def f1 = Future(throw new Exception("err1"))
def f2 = Future(p0.success("success"))
info("f1 andThen f2 => always execute f2 but always return f1")
f1.andThen(_ => f2)
p0.future.map(result => result shouldBe "success")
}
// -------------------------------------------------
it should "execute sequentially but stops on first on error" in {
val p0 = Promise[String]()
def f1 = Future(throw new Exception("err1"))
def f2 = Future(p0.success("good"))
f1.map(_ => f2)
Future {
Thread.sleep(100)
p0.future.isCompleted shouldBe false
}
}
// -------------------------------------------------
val checkLimit = 1_000_000
val checkGroupSize = 1_000
val checkExpectedResult = LazyList.from(0).take(checkLimit).map(v => BigInt(v)).sum
it should "be possible to execute a lot of futures sequentially#1" in {
def someGroupedCompute(values: Iterable[Int]) = Future(BigInt(values.sum))
note("it works without any memory issues, was trying to reproduce an issue seen elsewhere...")
val futureResult =
LazyList
.from(0)
.take(checkLimit)
.grouped(checkGroupSize)
.foldLeft(someGroupedCompute(Nil)) { (prevFuture, nextGroup) =>
prevFuture.flatMap( sumA => someGroupedCompute(nextGroup).map(sumB => sumA+sumB))
}
futureResult.map(result => result shouldBe checkExpectedResult)
}
it should "be possible to execute a lot of futures sequentially#2" in {
def someGroupedCompute(values: Iterable[Int]) = Future(BigInt(values.sum))
note("it works without any memory issues, was trying to reproduce an issue seen elsewhere...")
def generator(from:Int) = new Iterator[Int]() {
var current = from
override def hasNext = true
override def next() = {
val prev = current
current+=1
prev
}
}
val futureResult =
generator(0)
.to(LazyList)
.take(checkLimit)
.grouped(checkGroupSize)
.foldLeft(someGroupedCompute(Nil)) { (prevFuture, nextGroup) =>
prevFuture.flatMap( sumA => someGroupedCompute(nextGroup).map(sumB => sumA+sumB))
}
futureResult.map(result => result shouldBe checkExpectedResult)
}
it should "be possible to execute a lot of futures sequentially#3" in {
def someGroupedCompute(values: Iterable[Int]) = Future(BigInt(values.sum))
note("THIS has been a good workaround to a seen issue with flatmap (linked to java future conversion ?)")
val futureResult =
LazyList
.from(0)
.take(checkLimit)
.grouped(checkGroupSize)
.map(someGroupedCompute)
.reduceLeft{ (futureA,futureB) =>
for {
resultA <- futureA
resultB <- futureB
} yield resultA + resultB
}
futureResult.map(result => result shouldBe checkExpectedResult)
}
// ==========================================================================
"Executors" can "be customized" in {
implicit val customexecutor = {
import scala.concurrent.ExecutionContext
//ExecutionContext.Implicits.global // the default one
//ExecutionContext.fromExecutor(new java.util.concurrent.ForkJoinPool(4))
ExecutionContext.fromExecutor(java.util.concurrent.Executors.newFixedThreadPool(4))
//ExecutionContext.fromExecutor(java.util.concurrent.Executors.newWorkStealingPool(4))
//ExecutionContext.fromExecutor(java.util.concurrent.Executors.newSingleThreadExecutor())
//ExecutionContext.fromExecutor(java.util.concurrent.Executors.newScheduledThreadPool(4))
//ExecutionContext.fromExecutor(java.util.concurrent.Executors.newCachedThreadPool())
}
val fut = Future(1)::Future(2)::Future(3)::Nil
Future.sequence(fut) map {f =>
f should have size 3
f.sum shouldBe 6
}
}
// ==========================================================================
}
org.scalatest.tools.Runner.main(Array("-oDF", "-s", classOf[ScalaLanguageAsyncBasicsTest].getName))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment