Skip to content

Instantly share code, notes, and snippets.

@fanf
Created March 25, 2020 14:00
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fanf/f39872886ee5b24c0d3e7af0c8171722 to your computer and use it in GitHub Desktop.
Save fanf/f39872886ee5b24c0d3e7af0c8171722 to your computer and use it in GitHub Desktop.
Testing batch process with ZIO. Concurrency is hard.
import zio._
import zio.clock._
import zio.duration._
import zio.test.environment._
import java.util.concurrent.TimeUnit
object Batch {
/*
* A batch is a forever looping running process (`forever`) forked in background
* and properly detached from its parent fiber (`forkDaemon`) that execute an
* effect each time a trigger is yielded.
*/
def createBatch(effect: ZIO[Clock, Nothing, Unit]): URIO[Clock, Fiber[Nothing, Nothing]] = {
// let's say that in our case, the trigger is just a "some time passed"
val trigger = UIO.unit.delay(5.minutes)
// the whole batch
(trigger *> effect).forever.forkDaemon
}
}
object MainTest1 {
/*
* Create a batch whose effect it to store current time in a `Ref` and write something out
*/
def demoBatch(r: Ref[List[Long]]) = Batch.createBatch(
// access what is needed from environment
ZIO.accessM[Clock]{ c => c.get.currentTime(TimeUnit.MINUTES).flatMap(t =>
// actual effect
UIO.effectTotal(println(s"done at $t")) *> r.update(t :: _)
) }
)
val prog1 = ZIO.accessM[Clock with TestClock] { testClock =>
def adjust(d: Duration) = testClock.get[TestClock.Service].adjust(d)
for {
r <- Ref.make(List.empty[Long])
_ <- demoBatch(r)
_ <- adjust(12.minutes)
l <- r.get
} yield l
}.provideLayer(testEnvironment)
def main(args: Array[String]): Unit = {
val l = Runtime.default.unsafeRun(prog1)
assert(l == 10 :: 5 :: Nil, s"Got unexpected execution list: ${l}")
}
}
object MainTest2 {
/*
* Create a batch whose effect it to store current time in a `Ref` and write something out
*/
def demoBatch(r: Ref[List[Long]]) = Batch.createBatch(
// access what is needed from environment
ZIO.accessM[Clock]{ c => c.get.currentTime(TimeUnit.MINUTES).flatMap(t =>
// actual effect
UIO.effectTotal(println(s"done at $t")) *> r.update(t :: _)
) }
)
val prog2 = ZIO.accessM[Clock with TestClock] { testClock =>
def adjust(d: Duration) = testClock.get[TestClock.Service].adjust(d)
for {
r <- Ref.make(List.empty[Long])
_ <- demoBatch(r)
_ <- adjust(12.minutes)
_ <- UIO.effectTotal(Thread.sleep(2000))
l <- r.get
} yield l
}.provideLayer(testEnvironment)
def main(args: Array[String]): Unit = {
val l = Runtime.default.unsafeRun(prog2)
assert(l == 10 :: 5 :: Nil, s"Got unexpected execution list: ${l}")
}
}
object MainTest3 {
/*
* Use a `Queue` to force synchronization between front and back fiber.
*/
def demoBatch(q: Queue[Long]) = Batch.createBatch(
// access what is needed from environment
ZIO.accessM[Clock]{ c => c.get.currentTime(TimeUnit.MINUTES).flatMap(t =>
// actual effect
UIO.effectTotal(println(s"done at $t")) *> q.offer(t).unit
) }
)
val prog3 = ZIO.accessM[Clock with TestClock] { testClock =>
def adjust(d: Duration) = testClock.get[TestClock.Service].adjust(d)
for {
q <- Queue.unbounded[Long]
_ <- demoBatch(q)
_ <- adjust(12.minutes)
l <- q.take.zip(q.take) // NOT `takeAll`, else you don't force synchronisation two times!
} yield l.productIterator.toList
}.provideLayer(testEnvironment)
def main(args: Array[String]): Unit = {
val l = Runtime.default.unsafeRun(prog3)
assert(l == 5 :: 10 :: Nil, s"Got unexpected execution list: ${l}")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment