Skip to content

Instantly share code, notes, and snippets.

@amoilanen
Created June 30, 2021 21:25
Show Gist options
  • Save amoilanen/9b3157e3c5ba3af7e67c26bcc78b4000 to your computer and use it in GitHub Desktop.
Save amoilanen/9b3157e3c5ba3af7e67c26bcc78b4000 to your computer and use it in GitHub Desktop.
Full runnable example for the documentation https://zio.dev/docs/datatypes/stream/subscription-ref including correcting the type inference for IDEA
import java.io.IOException
import zio.{Chunk, RefM, Runtime, UIO, URIO, ZIO, console, random}
import zio.console.Console
import zio.random.Random
import zio.stream.{SubscriptionRef, ZStream}
// Full running example for the ZIO documentation https://zio.dev/docs/datatypes/stream/subscription-ref
object Subscription extends App {
val runtime: Runtime[zio.ZEnv] = Runtime.default
def server(ref: RefM[Long]): UIO[Nothing] =
ref.update[Any, Nothing](n => ZIO.succeed(n + 1)).forever
def client(changes: ZStream[Any, Nothing, Long]): URIO[Random, Chunk[Long]] =
for {
n <- random.nextLongBetween(1, 200)
chunk <- changes.take(n).runCollect
} yield chunk
val app: ZIO[Console with Random, IOException, Unit] = for {
subscriptionRef <- SubscriptionRef.make(0L)
server <- server(subscriptionRef.ref).fork
chunks <- ZIO.collectAllPar(List.fill(100)(client(subscriptionRef.changes)))
_ <- server.interrupt
_ <- ZIO.foreach(chunks)(chunk => console.putStrLn(chunk.toString))
} yield ()
print(runtime.unsafeRun(app))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment