Last active April 23, 2019 10:18
// This is a demo how functional and recursive thinking could help us write unbreakable, stack safe code.
// **This is not any IP, but a general pattern of code solving a usual simple usecase**
// and that also show how safe it is to play with lazy structures (although I did that with scala.Stream as my project didnt have dependencies to fs2)
// Do note that, the entire logic can be just tested with `scala.Double, or scala.Int` and needn't get into the details of `java.time.Instant`
* Given the data freshness (eg: data always arrives by 2 days late: hence offset = 2.days,
* and the schedule be once in 10 days (schedule = 10.days),
* find out all the time periods (start time and end time) for which the spark job should run
* such that the last result was updated at `lastUpdatedTime`. It should also handle cron schedules.
* Also what happens if you are allowed to have an overlap while finding out the list of start-time and end-time
// scalastyle:off cyclomatic.complexity
import scalaz.Order
import scalaz.syntax.order._
import scalaz.syntax.std.boolean._
import scalaz.syntax.either._
import scalaz.{ \/-, \/ }
import Math._
final case class TimePeriod[A] private (start: A, end: A)
object TimePeriod {
def findAllTimePeriods[A : Order, B](
lastUpdatedTime: Option[A],
offset: B,
schedule: B,
now: A,
overlapTime: B
)(implicit P: Math[A, B]): String \/ Stream[TimePeriod[A]] = {
lastUpdatedTime match {
case Some(l) =>
val initialState =
for {
st <- l minus overlapTime
en <- st plus schedule
res <- (!(en > now || st === en)).option(TimePeriod(st, en)).right[String]
} yield res
case None =>
case Some(r) =>
unfold(r)(currentTime =>
for {
nextTime <- (currentTime.end plus schedule).map(t => TimePeriod(currentTime.end, t))
res <- (nextTime.end <= now && (currentTime.end /== nextTime.end)).option((nextTime, nextTime)).right
} yield res
).map(r #:: _)
case None =>
val endDate = now
val startDate =
for {
res <- P.minus(endDate, schedule)
res2 <- P.minus(res, offset)
} yield res2 => Stream(TimePeriod[A](st, now)))
private def unfold[S, A](z: S)(f: S => String \/ Option[(S, A)]): String \/ Stream[A] =
case Some((a, b)) => \/-(Stream.cons(b, unfold(a)(f).fold[Stream[A]](_ => Stream.empty[A], identity[Stream[A]])))
case None => Stream.empty[A].right
import scalaz.\/
import scalaz.\/-
import scalaz.-\/
import scalaz.syntax.either._
import CronOps._
trait Math[A, B] {
def minus(a: A, duration: B): String \/ A
def plus(a: A, duration: B): String \/ A
object Math extends MathSyntax {
def apply[A, B](implicit ev: Math[A, B]): PlusMinus[A, B] = ev
def instance[A, B](f: (A, B) => String \/ A, g: (A, B) => String \/ A): Math[A, B] =
new Math[A, B] {
override def minus(a: A, duration: B): \/[String,A] = f(a, duration)
override def plus(a: A, duration: B): String \/ A = g(a, duration)
implicit val mathld: Math[LocalDateTime, Duration] =
instance((a, b) => a.minus(b).right, (a, b) =>
implicit val PlusMinusDouble: PlusMinus[Double, Double] =
instance((a, b) => (a - b).right, ((a, b) => (a + b).right))
implicit val mathsch: Math[LocalDateTime, Schedule] =
(local, duration) =>
duration.value match {
case \/-(b) => local.minus(b._1, b._2).right
case -\/(a) => lastExecutionTime(a.toString.toUpperCase(), local).leftMap(_.shows)
(local, duration) =>
duration.value match {
case \/-(aa) =>, aa._2).right
case -\/(b) => nextExecutionTime(b.toString.toUpperCase(), local).leftMap(_.shows)
trait MathSyntax {
implicit class MathOps[A, B](self: A)(implicit ev: Math[A, B]) {
def plus (a: B) =, a)
def minus (a: B) = ev.minus(self, a)
final case class Schedule(value: CronExpr \/ (Long, ChronoUnit))
trait CronOps {
val definition: FailedCronDef \/ CronDefinition =
.withValidRange(1, 7)
.withValidRange(1970, 2099)
lazy val parser: CronOpError \/ CronParser = CronParser(_))
private def parseCron(string: String): CronOpError \/ Cron =
parser.flatMap(t => \/.fromTryCatchNonFatal(t.parse(string)).leftMap(CronParseException.apply))
private def executionTime(string: String): CronOpError \/ ExecutionTime =
parseCron(string.toUpperCase()).flatMap(cron => \/.fromTryCatchNonFatal(ExecutionTime.forCron(cron)).leftMap(FailedExecutionTimeForCron.apply))
private def oneStep(t: ExecutionTime, now: LocalDateTime, dir: StepDirection): CronOpError \/ LocalDateTime =
for {
r <- \/.fromTryCatchNonFatal(dir.nextTime(t)(now)).leftMap(t => StepException(t, now, Forward(1)))
v <- \/.fromTryCatchNonFatal(r.get()).leftMap(t => StepException(new RuntimeException(t.getMessage), now, Forward(1)))
} yield v.toLocalDateTime
// A better type driven step calculation.
def step(t: ExecutionTime, now: LocalDateTime, n: StepDirection): CronOpError \/ LocalDateTime = {
def go(l: LocalDateTime, count: Int): CronOpError \/ LocalDateTime =
if (n == 0)
else {
val next = singleJump(t, l, n)
next.fold(_.left, go(_, count - 1))
go(now, n.step)
// interop with cron4s. The whole application was initially built in cron4s (which was found buggy at a later stage.)
def stepS(schedule: CronExpr, now: LocalDateTime, n: StepDirection): CronOpError \/ LocalDateTime = {
lazy val exec = executionTime(schedule.toString)
exec.flatMap(step(_, now, n))
object CronOps extends CronOps {
abstract sealed class CronOpError(val underlying: Throwable)
final case class FailedCronDef(t: Throwable) extends CronOpError(t)
final case class CronParseException(t: Throwable) extends CronOpError(t)
final case class FailedExecutionTimeForCron(t: Throwable) extends CronOpError(t)
final case class StepException(t: Throwable, currentTime: LocalDateTime, direction: StepDirection) extends CronOpError(t)
implicit val show: Show[CronOpError] = Show.shows[CronOpError]({
case FailedCronDef(t) => s"Invalid cron defintion: ${t.getMessage} ${Option(t.getCause).getOrElse("")}"
case CronParseException(t) => s"Invalid cron string: ${t.getMessage} ${Option(t.getCause).getOrElse("")}"
case FailedExecutionTimeForCron(t) => s"Failed to find the execution time from cron: ${t.getMessage} ${Option(t.getCause).getOrElse("")}"
case StepException(t, d, dir) =>
s"""Failed to step through the cron: ${t.getMessage} ${Option(t.getCause).getOrElse("")}.
The point where it failed was: ${d.toString}.
Step: ${dir.toString}
abstract sealed class StepDirection(val step: Int) {
override def toString = this match {
case Forward(n) => s"step forward by ${n}"
case Backward(n) => s"step backward by ${n}"
val nextTime: ExecutionTime => LocalDateTime => Optional[ZonedDateTime] =
a => b => a.nextExecution(ZonedDateTime.of(b, ZoneId.systemDefault()))
object StepDirection {
final case class Forward(n: Int) extends StepDirection(n)
final case class Backward(n: Int) extends StepDirection(n)
