Skip to content

Instantly share code, notes, and snippets.

@fversnel
Last active February 11, 2016 08:56
Show Gist options
  • Save fversnel/4fa73ad78c47f08ac587 to your computer and use it in GitHub Desktop.
Save fversnel/4fa73ad78c47f08ac587 to your computer and use it in GitHub Desktop.
Creates an Observable from a cron expression using the Quartz scheduler
package org.frankversnel.rxquartz
import java.time.Instant
import java.util.UUID
import scala.concurrent.{ExecutionContext, Promise, Future}
import com.typesafe.scalalogging.Logger
import org.quartz._
import rx.lang.scala.{Observer, Subscription, Observable}
import scala.concurrent.ExecutionContext
object Quartz {
class QuartzScheduler(val scheduler: Scheduler)(implicit ec: ExecutionContext) extends AutoCloseable {
val cancellable = new AsyncCancellable
def start() = scheduler.start()
override def close(): Unit = {
cancellable.close()
scheduler.shutdown(true)
}
}
private class CronJob extends Job {
override def execute(context: JobExecutionContext): Unit = {
val jobId = context.getMergedJobDataMap.get("jobId").asInstanceOf[String]
val obs = context.getScheduler.getContext.get(jobId).asInstanceOf[Observer[Instant]]
obs.onNext(Instant.now())
}
}
implicit class SchedulerExtensions(quartzScheduler: QuartzScheduler) {
/**
* Creates an Observable that triggers based on a Cron expression
*
* @param cronExpression a valid cron expression
* @see http://www.quartz-scheduler.org/documentation/quartz-1.x/tutorials/crontrigger
* @return
*/
def createCronJob(cronExpression: String)(implicit logger: Logger): Observable[Instant] = {
import JobBuilder._
import TriggerBuilder._
import CronScheduleBuilder._
Observable.create(obs => {
val jobId = UUID.randomUUID().toString
val job = newJob(classOf[CronJob])
.usingJobData("jobId", jobId)
.build()
val trigger = newTrigger()
.withSchedule(cronSchedule(cronExpression))
.build()
val scheduler = quartzScheduler.scheduler
scheduler.getContext.put(jobId, obs)
scheduler.scheduleJob(job, trigger)
val subscription = Subscription {
obs.onCompleted()
try {
scheduler.deleteJob(job.getKey)
scheduler.getContext.remove(jobId)
} catch {
case e: Exception =>
logger.debug(s"Something went wrong when deleting quartz jobs, due to: ${e.getMessage}")
}
}
quartzScheduler.cancellable.whenClosed(subscription.unsubscribe)
subscription
})
}
}
class CancelledException extends Exception
class AsyncCancellable(implicit ec: ExecutionContext) extends AutoCloseable {
private val p = Promise[Nothing]
def onCancel: Future[Nothing] = p.future
def whenClosed(work: () => Unit): Unit = {
onCancel.onComplete(_ => work())
}
override def close(): Unit = p.failure(new CancelledException)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment