Skip to content

Instantly share code, notes, and snippets.

@guersam
Created March 1, 2023 12:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save guersam/22cb2de7d9be63262c38e463c1484c60 to your computer and use it in GitHub Desktop.
Save guersam/22cb2de7d9be63262c38e463c1484c60 to your computer and use it in GitHub Desktop.
ZStream#throttleShape test
+ throttleShape
- delays chunks enough when burst == 0
✗ Result was false
res2.forall(i => 1000 <= i && i <= 2000)
res2 = List(294, 397, 499, 605, 710, 816, 922, 1025, 1131, 1235)
//> using scala "3.2.2"
//> using dep "dev.zio::zio:2.0.9"
//> using dep "dev.zio::zio-test:2.0.9"
import zio.*
import zio.stream.*
import zio.test.*
import java.time.temporal.ChronoUnit
object ThrottleShapeSpec extends ZIOSpecDefault {
val spec =
suite("throttleShape")(
test("delays chunks after limit when burst == 1") {
for {
start <- Clock.instant
fiber <- Queue
.bounded[Int](30)
.flatMap { queue =>
ZStream
.fromQueue(queue, maxChunkSize = 1)
.throttleShape(10, 1.second, 0)(_.size.toLong)
.mapZIOParUnordered(Int.MaxValue)(_ =>
Clock.instant.map(start.until(_, ChronoUnit.MILLIS))
)
.toPull
.flatMap { pull =>
for {
_ <- ZIO.foreachDiscard(1 to 30)(queue.offer)
res1 <- pull.replicateZIO(10).map(_.flatten)
_ <- assertTrue(res1.forall(i => 0 <= i && i <= 1000))
res2 <- pull.replicateZIO(10).map(_.flatten)
_ <- assertTrue(res2.forall(i => 1000 <= i && i <= 2000))
} yield assertCompletes
}
}
.fork
test <- fiber.join
} yield test
} @@ TestAspect.withLiveClock
)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment