Skip to content

Instantly share code, notes, and snippets.

@ayeo
Last active March 11, 2022 12:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ayeo/8d7b725a26ce625a1685d8538214111a to your computer and use it in GitHub Desktop.
Save ayeo/8d7b725a26ce625a1685d8538214111a to your computer and use it in GitHub Desktop.
package pl.ayeo
import cats.effect.{IO, IOApp, Ref}
import cats.effect.std.Queue
import cats.effect.unsafe.implicits.global
import cats.syntax.all.*
import fs2.{Pipe, Stream}
import scala.concurrent.duration.*
import scala.util.Random
object ProcessManagerApp extends IOApp.Simple {
object Protocol {
abstract sealed class Message
abstract sealed class Event extends Message
case class BulkReceived(quantity: Int) extends Event
case class Status(quantity: Int) extends Event
abstract sealed class Command extends Message
case class Add(quantity: Int) extends Command
case class Reset(quantity: Int) extends Command
case class Check() extends Command
}
import Protocol._
case class Aggregate(r: Ref[IO, Int]) {
def handle(command: Command): IO[Option[Event]] = command match { //todo events list
case Add(quantity) =>
r.updateAndGet(_ + quantity).map { c =>
if (c > 0 && c % 10 == 0) Some(BulkReceived(c))
else None
}
case Reset(quantity) => r.getAndUpdate(_ - quantity).map(_ => None)
case Check() => r.get.map(c => Some(Status(c)))
case _ => IO(None)
}
}
case class ProcessManager() {
def handle(event: Event): Option[Command] = event match {
case BulkReceived(quantity) => Some(Reset(Math.round(quantity/2)))
case _ => None
}
}
def pmPipe(processManager: ProcessManager, commands: Queue[IO, Command]): Pipe[IO, Event, Event] = _.evalMap { i =>
processManager.handle(i).map(commands.offer).sequence >> IO(i)
}
def aggregatePipe(q: Queue[IO, Event], aggregate: Aggregate): Pipe[IO, Command, Command] = _.evalMap { input =>
for {
oi <- aggregate.handle(input)
_ <- oi.map(q.offer).sequence
} yield input
}
val ioAggregate: IO[Aggregate] = Ref[IO].of(0).map(Aggregate(_))
val inboxQueue: IO[Queue[IO, Command]] = Queue.unbounded[IO, Command]
val outboxQueue: IO[Queue[IO, Event]] = Queue.unbounded[IO, Event]
val finalStream: Stream[IO, (Aggregate, Queue[IO, Command], Queue[IO, Event])] = Stream.eval {
for {
a <- ioAggregate
i <- inboxQueue
o <- outboxQueue
} yield (a, i , o)
}
val addStream: Stream[IO, Command] = Stream.repeatEval(IO.sleep(5.millis) >> IO(Add(2)))
val checkStream: Stream[IO, Command] = Stream.repeatEval(IO.sleep(5.millis) >> IO(Check()))
val program: Stream[IO, Message] = finalStream.flatMap {
(aggregate, commands, events) =>
Stream.fromQueueUnterminated(commands)
.merge(addStream)
.merge(checkStream)
.through(aggregatePipe(events, aggregate))
.merge(Stream.fromQueueUnterminated(events).through(pmPipe(ProcessManager(), commands)))
}
override def run: IO[Unit] =
program.through(_.evalMap(t => IO.println(s"${Thread.currentThread().getName()}:\t $t"))).compile.drain
/* Output
io-compute-6: Check()
io-compute-3: Status(0)
io-compute-7: Add(2)
io-compute-8: Check()
io-compute-5: Status(2)
io-compute-1: Add(2)
io-compute-3: Check()
io-compute-9: Status(4)
io-compute-10: Add(2)
io-compute-7: Check()
io-compute-10: Status(6)
io-compute-3: Check()
io-compute-8: Status(6)
io-compute-8: Add(2)
io-compute-5: Check()
io-compute-4: Status(8)
io-compute-5: Add(2)
io-compute-8: BulkReceived(10)
io-compute-1: Check()
io-compute-8: Status(10)
io-compute-2: Reset(5)
io-compute-7: Check()
io-compute-1: Status(5)
io-compute-7: Add(2)
io-compute-4: Check()
io-compute-2: Status(7)
...
*/
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment