Skip to content

Instantly share code, notes, and snippets.

@BalmungSan
Last active June 24, 2021 14:29
Show Gist options
  • Save BalmungSan/c8d7b47686052983f8e9818dd0cb5f7c to your computer and use it in GitHub Desktop.
Save BalmungSan/c8d7b47686052983f8e9818dd0cb5f7c to your computer and use it in GitHub Desktop.
HighLowPriorityRunner - A CE3 program to schedule high & low priority jobs to be run on a dedicated EC
name := "low-high-priority-runner"
version := "1.0.0"
scalaVersion := "2.13.6"
libraryDependencies += "org.typelevel" %% "cats-effect" % "3.1.1"
run / fork := true
package example
import cats.effect.Async
import cats.effect.std.Queue
import cats.effect.syntax.all._
import cats.syntax.all._
import scala.concurrent.ExecutionContext
object HighLowPriorityRunner {
final case class Config[F[_]](
highPriorityJobs: Queue[F, F[Unit]],
lowPriorityJobs: Queue[F, F[Unit]],
customEC: Option[ExecutionContext]
)
def apply[F[_]](config: Config[F])
(implicit F: Async[F]): F[Unit] = {
val processOneJob =
config.highPriorityJobs.tryTake.flatMap {
case Some(hpJob) => hpJob
case None => config.lowPriorityJobs.tryTake.flatMap {
case Some(lpJob) => lpJob
case None => F.unit
}
}
val loop = processOneJob.start.foreverM.void
config.customEC.fold(ifEmpty = loop)(ec => loop.evalOn(ec))
}
}
package example
import cats.effect.{Async, IO, IOApp, Resource}
import cats.effect.std.Queue
import cats.effect.syntax.all._
import cats.syntax.all._
import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
object Main extends IOApp.Simple {
override final val run: IO[Unit] =
Resource.make(IO(Executors.newFixedThreadPool(2)))(ec => IO.blocking(ec.shutdown())).use { ec =>
Program[IO](ExecutionContext.fromExecutor(ec))
}
}
object Program {
private def createJob[F[_]](id: Int)(implicit F: Async[F]): F[Unit] =
F.delay(println(s"Starting job ${id} on thread ${Thread.currentThread.getName}")) *> // Use Console over println on real code.
F.delay(Thread.sleep(1.second.toMillis)) *> // Blocks the Fiber! - Only for testing, use F.sleep on real code.
F.delay(println(s"Finished job ${id}!"))
def apply[F[_]](customEC: ExecutionContext)(implicit F: Async[F]): F[Unit] = for {
highPriorityJobs <- Queue.unbounded[F, F[Unit]]
lowPriorityJobs <- Queue.unbounded[F, F[Unit]]
runnerFiber <- HighLowPriorityRunner(HighLowPriorityRunner.Config(
highPriorityJobs,
lowPriorityJobs,
Some(customEC)
)).start
_ <- List.range(0, 10).traverse_(id => highPriorityJobs.offer(createJob(id)))
_ <- List.range(10, 15).traverse_(id => lowPriorityJobs.offer(createJob(id)))
_ <- F.sleep(5.seconds)
_ <- List.range(15, 20).traverse_(id => highPriorityJobs.offer(createJob(id)))
_ <- runnerFiber.join.void
} yield ()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment