Skip to content

Instantly share code, notes, and snippets.

@Zuchos
Last active May 26, 2020 13:13
Show Gist options
  • Save Zuchos/be215e25e78066db0cddfafe83d3cddd to your computer and use it in GitHub Desktop.
Save Zuchos/be215e25e78066db0cddfafe83d3cddd to your computer and use it in GitHub Desktop.
import akka.actor.testkit.typed.scaladsl.ActorTestKit
import akka.stream.scaladsl.{Sink, Source}
import org.scalatest.flatspec.AnyFlatSpecLike
import scala.concurrent.Future
//akka 2.6.5
//zio 1.0.3.5-RC
class PublisherTest extends AnyFlatSpecLike {
lazy val testKit: ActorTestKit = ActorTestKit("testKit123")
implicit lazy val mat = testKit.system
import zio.interop.reactivestreams._
it should "cancel subscription" in {
//Given
val env = zio.Runtime.default
val source = Source(1 to 500)
val publisher = source
.mapAsync(1) { e =>
println(s"Source $e")
Thread.sleep(10)
Future.successful(e)
}
.runWith(Sink.asPublisher(true))
env.unsafeRunAsync(
publisher
.toStream(16)
.map { e =>
println(s"Primary subscriber: $e")
e
}
.run(zio.stream.Sink.drain)
)(e => println(s"PRIMARY EXITING! $e"))
//When
Thread.sleep(2000)
env.unsafeRunAsync(
publisher
.toStream(2)
.takeWhile(_ < 300)
.map { e =>
println(s"ZIO 1: $e")
e
}
.run(zio.stream.Sink.drain)
)(e => println(s"1 EXITING! $e"))
Thread.sleep(20 * 1000)
//Then
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment