Skip to content

Instantly share code, notes, and snippets.

@khanetor
Last active May 12, 2022 19:34
Show Gist options
  • Save khanetor/014db994e15ebb58757062a655b56b7f to your computer and use it in GitHub Desktop.
Save khanetor/014db994e15ebb58757062a655b56b7f to your computer and use it in GitHub Desktop.
Task scheduler in Scala with Cats, Cats Effect, FS2, and FS2-cron
name := "task-scheduler"
version := "1.0.0-SNAPSHOT"
scalaVersion := "3.1.2"
libraryDependencies ++= Seq(
"org.typelevel" %% "cats-effect" % "3.3.11",
"eu.timepit" %% "fs2-cron-calev" % "0.7.2",
"dev.optics" %% "monocle-core" % "3.1.0",
"dev.optics" %% "monocle-macro" % "3.1.0",
"org.typelevel" %% "munit-cats-effect-3" % "1.0.6" % Test
)
package com.kha.main
import cats.effect.{IO, IOApp}
import fs2.Stream
import com.kha.models.TaskManager
import cats.effect.kernel.Ref
object Main extends IOApp.Simple:
import TaskManager.Task
// Existing tasks, that can be loaded from database
val existingTasks = List(
Task("task-1", "Task 1 has been executed", "*-*-* *:*:0/1"),
Task("task-2", "Task 2 has been executed", "*-*-* *:*:0/5"),
Task("task-3", "Task 3 has been executed", "*-*-* *:*:0/10")
)
override def run: IO[Unit] = for {
_ <- IO.println("Welcome!")
_ <- TaskManager.applyIO(existingTasks).use { taskManager =>
for {
_ <- Stream
.repeatEval(IO.println("Please enter your command:") >> IO.readLine)
.evalTap(command => IO.println(s"Command: $command"))
.takeWhile(_ != "quit")
.evalTap(command => runApp(command, taskManager))
.compile
.drain
} yield ()
}
} yield ()
def runApp(command: String, taskManager: TaskManager): IO[Unit] = for {
_ <- command match {
case "add" => addTaskCommand(taskManager)
case "remove" => removeTaskCommand(taskManager)
case _ => IO.unit
}
} yield ()
def addTaskCommand(taskManager: TaskManager): IO[Unit] = for {
_ <- IO.println("Task name:")
taskName <- IO.readLine
_ <- IO.println("Task message:")
taskMsg <- IO.readLine
_ <- IO.println("Task schedule:")
schedule <- IO.readLine
task = Task(taskName, taskMsg, schedule)
_ <- taskManager.addTask(task)
} yield ()
def removeTaskCommand(taskManager: TaskManager): IO[Unit] = for {
_ <- IO.println("Task name:")
taskName <- IO.readLine
_ <- taskManager.removeTask(taskName)
} yield ()
end Main
package com.kha.models
import cats.effect.{Ref, IO, Resource}
import eu.timepit.fs2cron.calev.CalevScheduler
import eu.timepit.fs2cron.Scheduler
import fs2.Stream
import fs2.concurrent.SignallingRef
import com.github.eikek.calev.CalEvent
import monocle.syntax.all._
import cats.syntax.all._
// TODO: Use Supervior, check usage of Ref
final class TaskManager private (private val taskRef: Ref[IO, List[TaskManager.Task]]):
import TaskManager.Task
val scheduler = CalevScheduler.systemDefault[IO]
def setTasks(newTasks: List[Task]): IO[Unit] = for {
// Stop all tasks and clear state
allStopHandlers <- taskRef.modify { tasks =>
val allStopHandlers = tasks
.map(_.signalHandler)
.filter(_.nonEmpty)
.map(_.get.set(true))
(Nil, allStopHandlers)
}
_ <- IO.parSequenceN(10)(allStopHandlers)
// Add and start new tasks
_ <- newTasks.map(addTask).parSequence
} yield ()
def purge(): IO[Unit] = for {
stopHandlers <- taskRef.modify { tasks =>
val stopHandlers = tasks
.map(_.signalHandler)
.filter(_.nonEmpty)
.map(_.get.set(true))
(Nil, stopHandlers)
}
_ <- IO.parSequenceN(10)(stopHandlers)
} yield ()
def addTask(task: Task): IO[Unit] = for {
// Should schedule a cronjob with a handler to cancel
signal <- TaskManager.runTask(task, scheduler)
_ <- taskRef.modify { tasks =>
val runningTask = task.focus(_.signalHandler).replace(signal.some)
(runningTask :: tasks, ())
}
} yield ()
def removeTask(taskName: String): IO[Unit] = for {
// Should cancel a cronjob using the handler
stopHandlers <- taskRef.modify { tasks =>
val stopHandlers = tasks
.filter(_.name == taskName)
.map(_.signalHandler)
.filter(_.nonEmpty)
.map(_.get.set(true))
(tasks.filterNot(_.name == taskName), stopHandlers)
}
_ <- IO.parSequenceN(10)(stopHandlers)
} yield ()
object TaskManager:
type CronSchedule = String
case class Task(
name: String,
message: String,
schedule: CronSchedule,
signalHandler: Option[Ref[IO, Boolean]] = None
)
def applyIO(tasks: List[Task] = List.empty): Resource[IO, TaskManager] =
val taskManagerIO = for {
_ <- IO.println("Preparing task manager")
emptyTaskIOs <- Ref[IO].of(List.empty[Task])
taskIOs <- Ref[IO].of(tasks)
taskManager = TaskManager(emptyTaskIOs)
_ <- taskManager.setTasks(tasks)
} yield taskManager
Resource.make(taskManagerIO)(tm => IO.println("Releasing task manager resources") *> tm.purge())
def runTask(task: Task, scheduler: Scheduler[IO, CalEvent]): IO[SignallingRef[IO, Boolean]] = for {
signal <- SignallingRef[IO, Boolean](false)
time = CalEvent.unsafe(task.schedule)
execution = IO.println(task.message).uncancelable
scheduledTask = scheduler.awakeEvery(time) >> Stream.eval(execution)
task <- scheduledTask.interruptWhen(signal).compile.drain.start
} yield signal
@khanetor
Copy link
Author

Job scheduling with FS2-cron

This is a demo on how to creating a job scheduling system to interactively create and remove jobs to run on a periodic cron schedule. A job is encapsulated in the TaskManager.Task data structure. There is an optional signalHandler in this structure that, if available, can be used to stop future job execution.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment