Skip to content

Instantly share code, notes, and snippets.

@mtsokol
Last active August 29, 2021 14:23
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 3 You must be signed in to fork a gist
  • Save mtsokol/0d6ab5473c04583899e3ffdcb7812959 to your computer and use it in GitHub Desktop.
Save mtsokol/0d6ab5473c04583899e3ffdcb7812959 to your computer and use it in GitHub Desktop.
Build your own Kafka in ZIO - Queues & Fibers
import zio._
import zio.random._
import zio.console._
import zio.duration._
object Main extends App {
override def run(args: List[String]) = program.exitCode
sealed trait Diagnostic
case object HipDiagnostic extends Diagnostic
case object KneeDiagnostic extends Diagnostic
case class Request[A](topic: Diagnostic, XRayImage: A)
trait RequestGenerator[R, A] {
def generate(topic: Diagnostic): URIO[R, Request[A]]
}
case class IntRequestGenerator() extends RequestGenerator[Random, Int] {
override def generate(topic: Diagnostic): URIO[Random, Request[Int]] =
nextIntBounded(1000) >>= (n => UIO(Request(topic, n)))
}
case class Consumer[A](title: String) {
def run = for {
queue <- Queue.bounded[A](10)
loop = for {
img <- queue.take
_ <- putStrLn(s"[$title] worker: Starting analyzing task $img")
rand <- nextIntBounded(4)
_ <- ZIO.sleep(rand.seconds)
_ <- putStrLn(s"[$title] worker: Finished task $img")
} yield ()
fiber <- loop.forever.fork
} yield (queue, fiber)
}
object Consumer {
def create[A](title: String) = UIO(Consumer[A](title))
}
case class TopicQueue[A](queue: Queue[A], subscribers: Ref[Map[Int, List[Queue[A]]]]) {
def subscribe(sub: Queue[A], consumerGroup: Int): UIO[Unit] =
subscribers.update { map =>
map.get(consumerGroup) match {
case Some(value) =>
map + (consumerGroup -> (value :+ sub))
case None =>
map + (consumerGroup -> List(sub))
}
}
private val loop =
for {
elem <- queue.take
subs <- subscribers.get
_ <- ZIO.foreach(subs.values) { group =>
for {
idx <- nextIntBounded(group.length)
_ <- group(idx).offer(elem)
} yield ()
}
} yield ()
def run = loop.forever.fork
}
object TopicQueue {
def create[A](queue: Queue[A]): UIO[TopicQueue[A]] =
Ref.make(Map.empty[Int, List[Queue[A]]]) >>= (map => UIO(TopicQueue(queue, map)))
}
case class Exchange[A]() {
def run = for {
jobQueue <- Queue.bounded[Request[A]](10)
queueHip <- Queue.bounded[A](10)
queueKnee <- Queue.bounded[A](10)
hipTopicQueue <- TopicQueue.create(queueHip)
kneeTopicQueue <- TopicQueue.create(queueKnee)
loop = for {
job <- jobQueue.take
_ <- job.topic match {
case HipDiagnostic =>
queueHip.offer(job.XRayImage)
case KneeDiagnostic =>
queueKnee.offer(job.XRayImage)
}
} yield ()
fiber <- loop.forever.fork
} yield (jobQueue, hipTopicQueue, kneeTopicQueue, fiber)
}
object Exchange {
def create[A] = UIO(Exchange[A]())
}
case class Producer[R, A](queue: Queue[Request[A]], generator: RequestGenerator[R, A]) {
def run = {
val loop = for {
_ <- putStrLn("[XRayRoom] generating hip and knee request")
hip <- generator.generate(HipDiagnostic)
_ <- queue.offer(hip)
knee <- generator.generate(KneeDiagnostic)
_ <- queue.offer(knee)
_ <- ZIO.sleep(2.seconds)
} yield ()
loop.forever.fork
}
}
object Producer {
def create[R, A](queue: Queue[Request[A]], generator: RequestGenerator[R, A]) = UIO(Producer(queue, generator))
}
val program = for {
physicianHip <- Consumer.create[Int]("Hip")
ctxPhHip <- physicianHip.run
(phHipQueue, phHipFiber) = ctxPhHip
loggerHip <- Consumer.create[Int]("HIP_LOGGER")
ctxLoggerHip <- loggerHip.run
(loggerHipQueue, _) = ctxLoggerHip
physicianKnee <- Consumer.create[Int]("Knee1")
ctxPhKnee <- physicianKnee.run
(phKneeQueue, _) = ctxPhKnee
physicianKnee2 <- Consumer.create[Int]("Knee2")
ctxPhKnee2 <- physicianKnee2.run
(phKneeQueue2, _) = ctxPhKnee2
exchange <- Exchange.create[Int]
ctxExchange <- exchange.run
(inputQueue, outputQueueHip, outputQueueKnee, _) = ctxExchange
generator = IntRequestGenerator()
xRayRoom <- Producer.create(inputQueue, generator)
_ <- xRayRoom.run
_ <- outputQueueHip.subscribe(phHipQueue, consumerGroup = 1)
_ <- outputQueueHip.subscribe(loggerHipQueue, consumerGroup = 2)
_ <- outputQueueKnee.subscribe(phKneeQueue, consumerGroup = 1)
_ <- outputQueueKnee.subscribe(phKneeQueue2, consumerGroup = 1)
_ <- outputQueueHip.run
_ <- outputQueueKnee.run
_ <- phHipFiber.join
} yield ()
}
@mtsokol
Copy link
Author

mtsokol commented Sep 18, 2019

Blog post available here: https://scalac.io/build-your-own-kafka-in-zio-queues-fibers

For sbt build add in build.sbt:

libraryDependencies += "dev.zio" %% "zio" % "1.0.3"

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment