Last active
October 13, 2024 16:18
-
-
Save mtsokol/0d6ab5473c04583899e3ffdcb7812959 to your computer and use it in GitHub Desktop.
Build your own Kafka in ZIO - Queues & Fibers
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import zio._ | |
import zio.random._ | |
import zio.console._ | |
import zio.duration._ | |
object Main extends App { | |
override def run(args: List[String]) = program.exitCode | |
sealed trait Diagnostic | |
case object HipDiagnostic extends Diagnostic | |
case object KneeDiagnostic extends Diagnostic | |
case class Request[A](topic: Diagnostic, XRayImage: A) | |
trait RequestGenerator[R, A] { | |
def generate(topic: Diagnostic): URIO[R, Request[A]] | |
} | |
case class IntRequestGenerator() extends RequestGenerator[Random, Int] { | |
override def generate(topic: Diagnostic): URIO[Random, Request[Int]] = | |
nextIntBounded(1000) >>= (n => UIO(Request(topic, n))) | |
} | |
case class Consumer[A](title: String) { | |
def run = for { | |
queue <- Queue.bounded[A](10) | |
loop = for { | |
img <- queue.take | |
_ <- putStrLn(s"[$title] worker: Starting analyzing task $img") | |
rand <- nextIntBounded(4) | |
_ <- ZIO.sleep(rand.seconds) | |
_ <- putStrLn(s"[$title] worker: Finished task $img") | |
} yield () | |
fiber <- loop.forever.fork | |
} yield (queue, fiber) | |
} | |
object Consumer { | |
def create[A](title: String) = UIO(Consumer[A](title)) | |
} | |
case class TopicQueue[A](queue: Queue[A], subscribers: Ref[Map[Int, List[Queue[A]]]]) { | |
def subscribe(sub: Queue[A], consumerGroup: Int): UIO[Unit] = | |
subscribers.update { map => | |
map.get(consumerGroup) match { | |
case Some(value) => | |
map + (consumerGroup -> (value :+ sub)) | |
case None => | |
map + (consumerGroup -> List(sub)) | |
} | |
} | |
private val loop = | |
for { | |
elem <- queue.take | |
subs <- subscribers.get | |
_ <- ZIO.foreach(subs.values) { group => | |
for { | |
idx <- nextIntBounded(group.length) | |
_ <- group(idx).offer(elem) | |
} yield () | |
} | |
} yield () | |
def run = loop.forever.fork | |
} | |
object TopicQueue { | |
def create[A](queue: Queue[A]): UIO[TopicQueue[A]] = | |
Ref.make(Map.empty[Int, List[Queue[A]]]) >>= (map => UIO(TopicQueue(queue, map))) | |
} | |
case class Exchange[A]() { | |
def run = for { | |
jobQueue <- Queue.bounded[Request[A]](10) | |
queueHip <- Queue.bounded[A](10) | |
queueKnee <- Queue.bounded[A](10) | |
hipTopicQueue <- TopicQueue.create(queueHip) | |
kneeTopicQueue <- TopicQueue.create(queueKnee) | |
loop = for { | |
job <- jobQueue.take | |
_ <- job.topic match { | |
case HipDiagnostic => | |
queueHip.offer(job.XRayImage) | |
case KneeDiagnostic => | |
queueKnee.offer(job.XRayImage) | |
} | |
} yield () | |
fiber <- loop.forever.fork | |
} yield (jobQueue, hipTopicQueue, kneeTopicQueue, fiber) | |
} | |
object Exchange { | |
def create[A] = UIO(Exchange[A]()) | |
} | |
case class Producer[R, A](queue: Queue[Request[A]], generator: RequestGenerator[R, A]) { | |
def run = { | |
val loop = for { | |
_ <- putStrLn("[XRayRoom] generating hip and knee request") | |
hip <- generator.generate(HipDiagnostic) | |
_ <- queue.offer(hip) | |
knee <- generator.generate(KneeDiagnostic) | |
_ <- queue.offer(knee) | |
_ <- ZIO.sleep(2.seconds) | |
} yield () | |
loop.forever.fork | |
} | |
} | |
object Producer { | |
def create[R, A](queue: Queue[Request[A]], generator: RequestGenerator[R, A]) = UIO(Producer(queue, generator)) | |
} | |
val program = for { | |
physicianHip <- Consumer.create[Int]("Hip") | |
ctxPhHip <- physicianHip.run | |
(phHipQueue, phHipFiber) = ctxPhHip | |
loggerHip <- Consumer.create[Int]("HIP_LOGGER") | |
ctxLoggerHip <- loggerHip.run | |
(loggerHipQueue, _) = ctxLoggerHip | |
physicianKnee <- Consumer.create[Int]("Knee1") | |
ctxPhKnee <- physicianKnee.run | |
(phKneeQueue, _) = ctxPhKnee | |
physicianKnee2 <- Consumer.create[Int]("Knee2") | |
ctxPhKnee2 <- physicianKnee2.run | |
(phKneeQueue2, _) = ctxPhKnee2 | |
exchange <- Exchange.create[Int] | |
ctxExchange <- exchange.run | |
(inputQueue, outputQueueHip, outputQueueKnee, _) = ctxExchange | |
generator = IntRequestGenerator() | |
xRayRoom <- Producer.create(inputQueue, generator) | |
_ <- xRayRoom.run | |
_ <- outputQueueHip.subscribe(phHipQueue, consumerGroup = 1) | |
_ <- outputQueueHip.subscribe(loggerHipQueue, consumerGroup = 2) | |
_ <- outputQueueKnee.subscribe(phKneeQueue, consumerGroup = 1) | |
_ <- outputQueueKnee.subscribe(phKneeQueue2, consumerGroup = 1) | |
_ <- outputQueueHip.run | |
_ <- outputQueueKnee.run | |
_ <- phHipFiber.join | |
} yield () | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Blog post available here: https://scalac.io/build-your-own-kafka-in-zio-queues-fibers
For sbt build add in
build.sbt
: