Last active
May 12, 2022 19:34
-
-
Save khanetor/014db994e15ebb58757062a655b56b7f to your computer and use it in GitHub Desktop.
Task scheduler in Scala with Cats, Cats Effect, FS2, and FS2-cron
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
name := "task-scheduler" | |
version := "1.0.0-SNAPSHOT" | |
scalaVersion := "3.1.2" | |
libraryDependencies ++= Seq( | |
"org.typelevel" %% "cats-effect" % "3.3.11", | |
"eu.timepit" %% "fs2-cron-calev" % "0.7.2", | |
"dev.optics" %% "monocle-core" % "3.1.0", | |
"dev.optics" %% "monocle-macro" % "3.1.0", | |
"org.typelevel" %% "munit-cats-effect-3" % "1.0.6" % Test | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.kha.main | |
import cats.effect.{IO, IOApp} | |
import fs2.Stream | |
import com.kha.models.TaskManager | |
import cats.effect.kernel.Ref | |
object Main extends IOApp.Simple: | |
import TaskManager.Task | |
// Existing tasks, that can be loaded from database | |
val existingTasks = List( | |
Task("task-1", "Task 1 has been executed", "*-*-* *:*:0/1"), | |
Task("task-2", "Task 2 has been executed", "*-*-* *:*:0/5"), | |
Task("task-3", "Task 3 has been executed", "*-*-* *:*:0/10") | |
) | |
override def run: IO[Unit] = for { | |
_ <- IO.println("Welcome!") | |
_ <- TaskManager.applyIO(existingTasks).use { taskManager => | |
for { | |
_ <- Stream | |
.repeatEval(IO.println("Please enter your command:") >> IO.readLine) | |
.evalTap(command => IO.println(s"Command: $command")) | |
.takeWhile(_ != "quit") | |
.evalTap(command => runApp(command, taskManager)) | |
.compile | |
.drain | |
} yield () | |
} | |
} yield () | |
def runApp(command: String, taskManager: TaskManager): IO[Unit] = for { | |
_ <- command match { | |
case "add" => addTaskCommand(taskManager) | |
case "remove" => removeTaskCommand(taskManager) | |
case _ => IO.unit | |
} | |
} yield () | |
def addTaskCommand(taskManager: TaskManager): IO[Unit] = for { | |
_ <- IO.println("Task name:") | |
taskName <- IO.readLine | |
_ <- IO.println("Task message:") | |
taskMsg <- IO.readLine | |
_ <- IO.println("Task schedule:") | |
schedule <- IO.readLine | |
task = Task(taskName, taskMsg, schedule) | |
_ <- taskManager.addTask(task) | |
} yield () | |
def removeTaskCommand(taskManager: TaskManager): IO[Unit] = for { | |
_ <- IO.println("Task name:") | |
taskName <- IO.readLine | |
_ <- taskManager.removeTask(taskName) | |
} yield () | |
end Main |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.kha.models | |
import cats.effect.{Ref, IO, Resource} | |
import eu.timepit.fs2cron.calev.CalevScheduler | |
import eu.timepit.fs2cron.Scheduler | |
import fs2.Stream | |
import fs2.concurrent.SignallingRef | |
import com.github.eikek.calev.CalEvent | |
import monocle.syntax.all._ | |
import cats.syntax.all._ | |
// TODO: Use Supervior, check usage of Ref | |
final class TaskManager private (private val taskRef: Ref[IO, List[TaskManager.Task]]): | |
import TaskManager.Task | |
val scheduler = CalevScheduler.systemDefault[IO] | |
def setTasks(newTasks: List[Task]): IO[Unit] = for { | |
// Stop all tasks and clear state | |
allStopHandlers <- taskRef.modify { tasks => | |
val allStopHandlers = tasks | |
.map(_.signalHandler) | |
.filter(_.nonEmpty) | |
.map(_.get.set(true)) | |
(Nil, allStopHandlers) | |
} | |
_ <- IO.parSequenceN(10)(allStopHandlers) | |
// Add and start new tasks | |
_ <- newTasks.map(addTask).parSequence | |
} yield () | |
def purge(): IO[Unit] = for { | |
stopHandlers <- taskRef.modify { tasks => | |
val stopHandlers = tasks | |
.map(_.signalHandler) | |
.filter(_.nonEmpty) | |
.map(_.get.set(true)) | |
(Nil, stopHandlers) | |
} | |
_ <- IO.parSequenceN(10)(stopHandlers) | |
} yield () | |
def addTask(task: Task): IO[Unit] = for { | |
// Should schedule a cronjob with a handler to cancel | |
signal <- TaskManager.runTask(task, scheduler) | |
_ <- taskRef.modify { tasks => | |
val runningTask = task.focus(_.signalHandler).replace(signal.some) | |
(runningTask :: tasks, ()) | |
} | |
} yield () | |
def removeTask(taskName: String): IO[Unit] = for { | |
// Should cancel a cronjob using the handler | |
stopHandlers <- taskRef.modify { tasks => | |
val stopHandlers = tasks | |
.filter(_.name == taskName) | |
.map(_.signalHandler) | |
.filter(_.nonEmpty) | |
.map(_.get.set(true)) | |
(tasks.filterNot(_.name == taskName), stopHandlers) | |
} | |
_ <- IO.parSequenceN(10)(stopHandlers) | |
} yield () | |
object TaskManager: | |
type CronSchedule = String | |
case class Task( | |
name: String, | |
message: String, | |
schedule: CronSchedule, | |
signalHandler: Option[Ref[IO, Boolean]] = None | |
) | |
def applyIO(tasks: List[Task] = List.empty): Resource[IO, TaskManager] = | |
val taskManagerIO = for { | |
_ <- IO.println("Preparing task manager") | |
emptyTaskIOs <- Ref[IO].of(List.empty[Task]) | |
taskIOs <- Ref[IO].of(tasks) | |
taskManager = TaskManager(emptyTaskIOs) | |
_ <- taskManager.setTasks(tasks) | |
} yield taskManager | |
Resource.make(taskManagerIO)(tm => IO.println("Releasing task manager resources") *> tm.purge()) | |
def runTask(task: Task, scheduler: Scheduler[IO, CalEvent]): IO[SignallingRef[IO, Boolean]] = for { | |
signal <- SignallingRef[IO, Boolean](false) | |
time = CalEvent.unsafe(task.schedule) | |
execution = IO.println(task.message).uncancelable | |
scheduledTask = scheduler.awakeEvery(time) >> Stream.eval(execution) | |
task <- scheduledTask.interruptWhen(signal).compile.drain.start | |
} yield signal |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Job scheduling with FS2-cron
This is a demo on how to creating a job scheduling system to interactively create and remove jobs to run on a periodic cron schedule. A job is encapsulated in the
TaskManager.Task
data structure. There is an optionalsignalHandler
in this structure that, if available, can be used to stop future job execution.