Skip to content

Instantly share code, notes, and snippets.

@frabbit
Last active July 26, 2022 10:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save frabbit/916438066efb530452b4685088670798 to your computer and use it in GitHub Desktop.
Save frabbit/916438066efb530452b4685088670798 to your computer and use it in GitHub Desktop.
This gist shows 2 test cases for creating a kafka consumer in ZIO 2.0 via acquireRelease. While the first example hangs, the second example works as expected. I had to use ZIO.interruptible in the second example to get it working. The question is why ZIO.interruptible is required and if it's a good solution at all.
import zio.*
import zio.test.*
import zio.test.Assertion.{equalTo, assertion}
import com.dimafeng.testcontainers.KafkaContainer
import zio.kafka.consumer.ConsumerSettings
import zio.kafka.producer.ProducerSettings
import zio.kafka.consumer.Consumer
import io.github.scottweaver.zio.testcontainers.kafka.ZKafkaContainer
import zio.kafka.consumer.Consumer.OffsetRetrieval
import zio.kafka.consumer.Consumer.AutoOffsetStrategy
import zio.Cause
object ScopeSpec extends ZIOSpecDefault:
val fullSpec = suite("Scope1Spec") {
type Deps = KafkaContainer & ConsumerSettings
def consumerLayer:ZLayer[ConsumerSettings, Throwable, Consumer] =
val acquire =
for {
clientId <- Random.nextUUID.map(_.toString())
cfg <- ZIO.service[ConsumerSettings]
result <- Consumer.make(cfg.withClientId(clientId).withGroupId(clientId))
} yield result
val release = (c:Consumer) =>
printLine("killConsumer")
*> c.stopConsumption
*> c.unsubscribe.ignore
*> printLine("killConsumer done")
ZLayer.scoped(ZIO.acquireRelease(acquire)(release) <* printLine("afterAcquireRelease"))
test("acquiring and releasing a kafka consumer should work as expected") {
val app:ZIO[Deps & Consumer, Throwable, Unit] = ZIO.unit
assertZIO(
printLine("scoped start") *>
app.provideSomeLayer[Deps](consumerLayer)
<* printLine("scoped end")
)(equalTo(()))
}
}.provideSomeShared[TestConfig](
ZKafkaContainer.Settings.default,
ZKafkaContainer.live,
ZKafkaContainer.defaultConsumerSettings
.map(_.update(_.withOffsetRetrieval(OffsetRetrieval.Auto(AutoOffsetStrategy.Earliest)))),
) @@ TestAspect.withLiveEnvironment
@@ TestAspect.sequential
def spec = fullSpec
// when executed this prints
// + Scope1Spec
// scoped start
// afterAcquireRelease
// killConsumer
// killConsumer done
// (hangs)
import zio.*
import zio.test.*
import zio.test.Assertion.{equalTo, assertion}
import com.dimafeng.testcontainers.KafkaContainer
import zio.kafka.consumer.ConsumerSettings
import zio.kafka.producer.ProducerSettings
import zio.kafka.consumer.Consumer
import io.github.scottweaver.zio.testcontainers.kafka.ZKafkaContainer
import zio.kafka.consumer.Consumer.OffsetRetrieval
import zio.kafka.consumer.Consumer.AutoOffsetStrategy
import zio.Cause
object Scope2Spec extends ZIOSpecDefault:
val fullSpec = suite("Scope2Spec") {
type Deps = KafkaContainer & ConsumerSettings
def consumerLayer:ZLayer[ConsumerSettings, Throwable, Consumer] =
val acquire =
for {
clientId <- Random.nextUUID.map(_.toString())
cfg <- ZIO.service[ConsumerSettings]
result <- Consumer.make(cfg.withClientId(clientId).withGroupId(clientId))
} yield result
val release = (c:Consumer) =>
printLine("killConsumer")
*> c.stopConsumption
*> c.unsubscribe.ignore
*> printLine("killConsumer done")
ZLayer.scoped(ZIO.acquireRelease(ZIO.interruptible(acquire))(release) <* printLine("afterAcquireRelease"))
test("acquiring and releasing a kafka consumer should work as expected") {
val app:ZIO[Deps & Consumer, Throwable, Unit] = ZIO.unit
assertZIO(
printLine("scoped start") *>
app.provideSomeLayer[Deps](consumerLayer)
<* printLine("scoped end")
)(equalTo(()))
}
}.provideSomeShared[TestConfig](
ZKafkaContainer.Settings.default,
ZKafkaContainer.live,
ZKafkaContainer.defaultConsumerSettings
.map(_.update(_.withOffsetRetrieval(OffsetRetrieval.Auto(AutoOffsetStrategy.Earliest)))),
) @@ TestAspect.withLiveEnvironment
@@ TestAspect.sequential
def spec = fullSpec
// when executed this prints successfully
// + Scope2Spec
// scoped start
// afterAcquireRelease
// killConsumer
// killConsumer done
// scoped end
// + acquiring and releasing a kafka consumer should work as expected
// Execution took 0ms
// 1 tests, 1 passed
// All tests in ScopeSpec passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment