Skip to content

Instantly share code, notes, and snippets.

@afsalthaj
Last active April 23, 2019 10:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save afsalthaj/ed1c32507c95913eb96856412c36e55a to your computer and use it in GitHub Desktop.
Save afsalthaj/ed1c32507c95913eb96856412c36e55a to your computer and use it in GitHub Desktop.
// This is a demo how functional and recursive thinking could help us write unbreakable, stack safe code.
// **This is not any IP, but a general pattern of code solving a usual simple usecase**
// and that also show how safe it is to play with lazy structures (although I did that with scala.Stream as my project didnt have dependencies to fs2)
// Do note that, the entire logic can be just tested with `scala.Double, or scala.Int` and needn't get into the details of `java.time.Instant`
/**
* Given the data freshness (eg: data always arrives by 2 days late: hence offset = 2.days,
* and the schedule be once in 10 days (schedule = 10.days),
* find out all the time periods (start time and end time) for which the spark job should run
* such that the last result was updated at `lastUpdatedTime`. It should also handle cron schedules.
* Also what happens if you are allowed to have an overlap while finding out the list of start-time and end-time
*/
// scalastyle:off cyclomatic.complexity
import scalaz.Order
import scalaz.syntax.order._
import scalaz.syntax.std.boolean._
import scalaz.syntax.either._
import scalaz.{ \/-, \/ }
import Math._
final case class TimePeriod[A] private (start: A, end: A)
object TimePeriod {
def findAllTimePeriods[A : Order, B](
lastUpdatedTime: Option[A],
offset: B,
schedule: B,
now: A,
overlapTime: B
)(implicit P: Math[A, B]): String \/ Stream[TimePeriod[A]] = {
lastUpdatedTime match {
case Some(l) =>
val initialState =
for {
st <- l minus overlapTime
en <- st plus schedule
res <- (!(en > now || st === en)).option(TimePeriod(st, en)).right[String]
} yield res
initialState.flatMap({
case None =>
\/-(Stream.empty[TimePeriod[A]])
case Some(r) =>
unfold(r)(currentTime =>
for {
nextTime <- (currentTime.end plus schedule).map(t => TimePeriod(currentTime.end, t))
res <- (nextTime.end <= now && (currentTime.end /== nextTime.end)).option((nextTime, nextTime)).right
} yield res
).map(r #:: _)
})
case None =>
val endDate = now
val startDate =
for {
res <- P.minus(endDate, schedule)
res2 <- P.minus(res, offset)
} yield res2
startDate.map(st => Stream(TimePeriod[A](st, now)))
}
}
private def unfold[S, A](z: S)(f: S => String \/ Option[(S, A)]): String \/ Stream[A] =
f(z).flatMap({
case Some((a, b)) => \/-(Stream.cons(b, unfold(a)(f).fold[Stream[A]](_ => Stream.empty[A], identity[Stream[A]])))
case None => Stream.empty[A].right
})
}
////////////////
import scalaz.\/
import scalaz.\/-
import scalaz.-\/
import scalaz.syntax.show._
import scalaz.syntax.either._
import CronOps._
trait Math[A, B] {
def minus(a: A, duration: B): String \/ A
def plus(a: A, duration: B): String \/ A
}
object Math extends MathSyntax {
def apply[A, B](implicit ev: Math[A, B]): PlusMinus[A, B] = ev
def instance[A, B](f: (A, B) => String \/ A, g: (A, B) => String \/ A): Math[A, B] =
new Math[A, B] {
override def minus(a: A, duration: B): \/[String,A] = f(a, duration)
override def plus(a: A, duration: B): String \/ A = g(a, duration)
}
implicit val mathld: Math[LocalDateTime, Duration] =
instance((a, b) => a.minus(b).right, (a, b) => a.plus(b).right)
implicit val PlusMinusDouble: PlusMinus[Double, Double] =
instance((a, b) => (a - b).right, ((a, b) => (a + b).right))
implicit val mathsch: Math[LocalDateTime, Schedule] =
instance(
(local, duration) =>
duration.value match {
case \/-(b) => local.minus(b._1, b._2).right
case -\/(a) => lastExecutionTime(a.toString.toUpperCase(), local).leftMap(_.shows)
},
(local, duration) =>
duration.value match {
case \/-(aa) => local.plus(aa._1, aa._2).right
case -\/(b) => nextExecutionTime(b.toString.toUpperCase(), local).leftMap(_.shows)
}
)
}
trait MathSyntax {
implicit class MathOps[A, B](self: A)(implicit ev: Math[A, B]) {
def plus (a: B) = ev.plus(self, a)
def minus (a: B) = ev.minus(self, a)
}
}
//////////////////////
final case class Schedule(value: CronExpr \/ (Long, ChronoUnit))
///////////////////////
trait CronOps {
val definition: FailedCronDef \/ CronDefinition =
\/.fromTryCatchNonFatal(
CronDefinitionBuilder
.defineCron()
.withSeconds()
.and()
.withMinutes()
.and()
.withHours()
.and()
.withDayOfMonth()
.supportsL()
.supportsW()
.supportsLW()
.supportsQuestionMark()
.and()
.withMonth()
.and()
.withDayOfWeek()
.withValidRange(1, 7)
.withMondayDoWValue(2)
.supportsHash()
.supportsL()
.supportsQuestionMark()
.and()
.withYear()
.withValidRange(1970, 2099)
.optional()
.and()
.withCronValidation(
CronConstraintsFactory.ensureEitherDayOfWeekOrDayOfMonth()
)
.instance()
).leftMap(FailedCronDef.apply)
lazy val parser: CronOpError \/ CronParser =
definition.map(new CronParser(_))
private def parseCron(string: String): CronOpError \/ Cron =
parser.flatMap(t => \/.fromTryCatchNonFatal(t.parse(string)).leftMap(CronParseException.apply))
private def executionTime(string: String): CronOpError \/ ExecutionTime =
parseCron(string.toUpperCase()).flatMap(cron => \/.fromTryCatchNonFatal(ExecutionTime.forCron(cron)).leftMap(FailedExecutionTimeForCron.apply))
private def oneStep(t: ExecutionTime, now: LocalDateTime, dir: StepDirection): CronOpError \/ LocalDateTime =
for {
r <- \/.fromTryCatchNonFatal(dir.nextTime(t)(now)).leftMap(t => StepException(t, now, Forward(1)))
v <- \/.fromTryCatchNonFatal(r.get()).leftMap(t => StepException(new RuntimeException(t.getMessage), now, Forward(1)))
} yield v.toLocalDateTime
// A better type driven step calculation.
def step(t: ExecutionTime, now: LocalDateTime, n: StepDirection): CronOpError \/ LocalDateTime = {
def go(l: LocalDateTime, count: Int): CronOpError \/ LocalDateTime =
if (n == 0)
l.right
else {
val next = singleJump(t, l, n)
next.fold(_.left, go(_, count - 1))
}
go(now, n.step)
}
// interop with cron4s. The whole application was initially built in cron4s (which was found buggy at a later stage.)
def stepS(schedule: CronExpr, now: LocalDateTime, n: StepDirection): CronOpError \/ LocalDateTime = {
lazy val exec = executionTime(schedule.toString)
exec.flatMap(step(_, now, n))
}
}
object CronOps extends CronOps {
abstract sealed class CronOpError(val underlying: Throwable)
final case class FailedCronDef(t: Throwable) extends CronOpError(t)
final case class CronParseException(t: Throwable) extends CronOpError(t)
final case class FailedExecutionTimeForCron(t: Throwable) extends CronOpError(t)
final case class StepException(t: Throwable, currentTime: LocalDateTime, direction: StepDirection) extends CronOpError(t)
implicit val show: Show[CronOpError] = Show.shows[CronOpError]({
case FailedCronDef(t) => s"Invalid cron defintion: ${t.getMessage} ${Option(t.getCause).getOrElse("")}"
case CronParseException(t) => s"Invalid cron string: ${t.getMessage} ${Option(t.getCause).getOrElse("")}"
case FailedExecutionTimeForCron(t) => s"Failed to find the execution time from cron: ${t.getMessage} ${Option(t.getCause).getOrElse("")}"
case StepException(t, d, dir) =>
s"""Failed to step through the cron: ${t.getMessage} ${Option(t.getCause).getOrElse("")}.
The point where it failed was: ${d.toString}.
Step: ${dir.toString}
"""
})
abstract sealed class StepDirection(val step: Int) {
override def toString = this match {
case Forward(n) => s"step forward by ${n}"
case Backward(n) => s"step backward by ${n}"
}
val nextTime: ExecutionTime => LocalDateTime => Optional[ZonedDateTime] =
a => b => a.nextExecution(ZonedDateTime.of(b, ZoneId.systemDefault()))
}
object StepDirection {
final case class Forward(n: Int) extends StepDirection(n)
final case class Backward(n: Int) extends StepDirection(n)
}
////////////////
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment