Skip to content

Instantly share code, notes, and snippets.

@erikvanoosten
Created May 7, 2023 09:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save erikvanoosten/166957289ebae159ba7e56c3aa3aea37 to your computer and use it in GitHub Desktop.
Save erikvanoosten/166957289ebae159ba7e56c3aa3aea37 to your computer and use it in GitHub Desktop.
package test
import zio._
import zio.stream.ZStream
import zio.internal.ExecutionMetrics
object MinimalReproducer extends ZIOAppDefault {
/**
* A runtime layer that can be used to run everything on the thread of the caller.
*
* Provided by Adam Fraser in Discord:
* https://discord.com/channels/629491597070827530/630498701860929559/1094279123880386590 but with cooperative
* yielding enabled.
*/
private[internal] val SameThreadRuntimeLayer: ZLayer[Any, Nothing, Unit] = {
val sameThreadExecutor = new Executor() {
override def metrics(implicit unsafe: Unsafe): Option[ExecutionMetrics] = None
override def submit(runnable: Runnable)(implicit unsafe: Unsafe): Boolean = {
runnable.run()
true
}
}
Runtime.setExecutor(sameThreadExecutor) ++ Runtime.setBlockingExecutor(sameThreadExecutor)
}
val toRunOnOneThread =
ZIO.attempt(println(s"Starting thread: ${Thread.currentThread().getId}: ${Thread.currentThread().getName}")) *>
ZStream(1)
.tap(_ => ZIO.attempt(println(s"Tapping 1 thread: ${Thread.currentThread().getId}: ${Thread.currentThread().getName}")))
.tap(_ => ZIO.sleep(10.millis))
.tap(_ => ZIO.attempt(println(s"Tapping 2 thread: ${Thread.currentThread().getId}: ${Thread.currentThread().getName}")))
.runDrain *>
ZIO.attempt(println(s"Ending thread: ${Thread.currentThread().getId}: ${Thread.currentThread().getName}"))
override def run: ZIO[Any with ZIOAppArgs with Scope, Any, Any] = {
for {
sameThreadRuntime <- ZIO.runtime[Any].provideLayer(SameThreadRuntimeLayer)
_ <- ZIO.attempt {
println(s"Expected thread: ${Thread.currentThread().getId}: ${Thread.currentThread().getName}")
Unsafe.unsafe { implicit u =>
sameThreadRuntime.unsafe.run(toRunOnOneThread).getOrThrowFiberFailure()
()
}
}
} yield ()
}
}
// Produces:
// Expected thread: 20 ZScheduler-Worker-7
// Starting thread: 20 ZScheduler-Worker-7
// Tapping 1 thread: 20 ZScheduler-Worker-7
// Tapping 2 thread: 25 zio-timer-1
// Ending thread: 25 zio-timer-1
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment