Last active
April 23, 2019 10:18
-
-
Save afsalthaj/ed1c32507c95913eb96856412c36e55a to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// This is a demo how functional and recursive thinking could help us write unbreakable, stack safe code. | |
// **This is not any IP, but a general pattern of code solving a usual simple usecase** | |
// and that also show how safe it is to play with lazy structures (although I did that with scala.Stream as my project didnt have dependencies to fs2) | |
// Do note that, the entire logic can be just tested with `scala.Double, or scala.Int` and needn't get into the details of `java.time.Instant` | |
/** | |
* Given the data freshness (eg: data always arrives by 2 days late: hence offset = 2.days, | |
* and the schedule be once in 10 days (schedule = 10.days), | |
* find out all the time periods (start time and end time) for which the spark job should run | |
* such that the last result was updated at `lastUpdatedTime`. It should also handle cron schedules. | |
* Also what happens if you are allowed to have an overlap while finding out the list of start-time and end-time | |
*/ | |
// scalastyle:off cyclomatic.complexity | |
import scalaz.Order | |
import scalaz.syntax.order._ | |
import scalaz.syntax.std.boolean._ | |
import scalaz.syntax.either._ | |
import scalaz.{ \/-, \/ } | |
import Math._ | |
final case class TimePeriod[A] private (start: A, end: A) | |
object TimePeriod { | |
def findAllTimePeriods[A : Order, B]( | |
lastUpdatedTime: Option[A], | |
offset: B, | |
schedule: B, | |
now: A, | |
overlapTime: B | |
)(implicit P: Math[A, B]): String \/ Stream[TimePeriod[A]] = { | |
lastUpdatedTime match { | |
case Some(l) => | |
val initialState = | |
for { | |
st <- l minus overlapTime | |
en <- st plus schedule | |
res <- (!(en > now || st === en)).option(TimePeriod(st, en)).right[String] | |
} yield res | |
initialState.flatMap({ | |
case None => | |
\/-(Stream.empty[TimePeriod[A]]) | |
case Some(r) => | |
unfold(r)(currentTime => | |
for { | |
nextTime <- (currentTime.end plus schedule).map(t => TimePeriod(currentTime.end, t)) | |
res <- (nextTime.end <= now && (currentTime.end /== nextTime.end)).option((nextTime, nextTime)).right | |
} yield res | |
).map(r #:: _) | |
}) | |
case None => | |
val endDate = now | |
val startDate = | |
for { | |
res <- P.minus(endDate, schedule) | |
res2 <- P.minus(res, offset) | |
} yield res2 | |
startDate.map(st => Stream(TimePeriod[A](st, now))) | |
} | |
} | |
private def unfold[S, A](z: S)(f: S => String \/ Option[(S, A)]): String \/ Stream[A] = | |
f(z).flatMap({ | |
case Some((a, b)) => \/-(Stream.cons(b, unfold(a)(f).fold[Stream[A]](_ => Stream.empty[A], identity[Stream[A]]))) | |
case None => Stream.empty[A].right | |
}) | |
} | |
//////////////// | |
import scalaz.\/ | |
import scalaz.\/- | |
import scalaz.-\/ | |
import scalaz.syntax.show._ | |
import scalaz.syntax.either._ | |
import CronOps._ | |
trait Math[A, B] { | |
def minus(a: A, duration: B): String \/ A | |
def plus(a: A, duration: B): String \/ A | |
} | |
object Math extends MathSyntax { | |
def apply[A, B](implicit ev: Math[A, B]): PlusMinus[A, B] = ev | |
def instance[A, B](f: (A, B) => String \/ A, g: (A, B) => String \/ A): Math[A, B] = | |
new Math[A, B] { | |
override def minus(a: A, duration: B): \/[String,A] = f(a, duration) | |
override def plus(a: A, duration: B): String \/ A = g(a, duration) | |
} | |
implicit val mathld: Math[LocalDateTime, Duration] = | |
instance((a, b) => a.minus(b).right, (a, b) => a.plus(b).right) | |
implicit val PlusMinusDouble: PlusMinus[Double, Double] = | |
instance((a, b) => (a - b).right, ((a, b) => (a + b).right)) | |
implicit val mathsch: Math[LocalDateTime, Schedule] = | |
instance( | |
(local, duration) => | |
duration.value match { | |
case \/-(b) => local.minus(b._1, b._2).right | |
case -\/(a) => lastExecutionTime(a.toString.toUpperCase(), local).leftMap(_.shows) | |
}, | |
(local, duration) => | |
duration.value match { | |
case \/-(aa) => local.plus(aa._1, aa._2).right | |
case -\/(b) => nextExecutionTime(b.toString.toUpperCase(), local).leftMap(_.shows) | |
} | |
) | |
} | |
trait MathSyntax { | |
implicit class MathOps[A, B](self: A)(implicit ev: Math[A, B]) { | |
def plus (a: B) = ev.plus(self, a) | |
def minus (a: B) = ev.minus(self, a) | |
} | |
} | |
////////////////////// | |
final case class Schedule(value: CronExpr \/ (Long, ChronoUnit)) | |
/////////////////////// | |
trait CronOps { | |
val definition: FailedCronDef \/ CronDefinition = | |
\/.fromTryCatchNonFatal( | |
CronDefinitionBuilder | |
.defineCron() | |
.withSeconds() | |
.and() | |
.withMinutes() | |
.and() | |
.withHours() | |
.and() | |
.withDayOfMonth() | |
.supportsL() | |
.supportsW() | |
.supportsLW() | |
.supportsQuestionMark() | |
.and() | |
.withMonth() | |
.and() | |
.withDayOfWeek() | |
.withValidRange(1, 7) | |
.withMondayDoWValue(2) | |
.supportsHash() | |
.supportsL() | |
.supportsQuestionMark() | |
.and() | |
.withYear() | |
.withValidRange(1970, 2099) | |
.optional() | |
.and() | |
.withCronValidation( | |
CronConstraintsFactory.ensureEitherDayOfWeekOrDayOfMonth() | |
) | |
.instance() | |
).leftMap(FailedCronDef.apply) | |
lazy val parser: CronOpError \/ CronParser = | |
definition.map(new CronParser(_)) | |
private def parseCron(string: String): CronOpError \/ Cron = | |
parser.flatMap(t => \/.fromTryCatchNonFatal(t.parse(string)).leftMap(CronParseException.apply)) | |
private def executionTime(string: String): CronOpError \/ ExecutionTime = | |
parseCron(string.toUpperCase()).flatMap(cron => \/.fromTryCatchNonFatal(ExecutionTime.forCron(cron)).leftMap(FailedExecutionTimeForCron.apply)) | |
private def oneStep(t: ExecutionTime, now: LocalDateTime, dir: StepDirection): CronOpError \/ LocalDateTime = | |
for { | |
r <- \/.fromTryCatchNonFatal(dir.nextTime(t)(now)).leftMap(t => StepException(t, now, Forward(1))) | |
v <- \/.fromTryCatchNonFatal(r.get()).leftMap(t => StepException(new RuntimeException(t.getMessage), now, Forward(1))) | |
} yield v.toLocalDateTime | |
// A better type driven step calculation. | |
def step(t: ExecutionTime, now: LocalDateTime, n: StepDirection): CronOpError \/ LocalDateTime = { | |
def go(l: LocalDateTime, count: Int): CronOpError \/ LocalDateTime = | |
if (n == 0) | |
l.right | |
else { | |
val next = singleJump(t, l, n) | |
next.fold(_.left, go(_, count - 1)) | |
} | |
go(now, n.step) | |
} | |
// interop with cron4s. The whole application was initially built in cron4s (which was found buggy at a later stage.) | |
def stepS(schedule: CronExpr, now: LocalDateTime, n: StepDirection): CronOpError \/ LocalDateTime = { | |
lazy val exec = executionTime(schedule.toString) | |
exec.flatMap(step(_, now, n)) | |
} | |
} | |
object CronOps extends CronOps { | |
abstract sealed class CronOpError(val underlying: Throwable) | |
final case class FailedCronDef(t: Throwable) extends CronOpError(t) | |
final case class CronParseException(t: Throwable) extends CronOpError(t) | |
final case class FailedExecutionTimeForCron(t: Throwable) extends CronOpError(t) | |
final case class StepException(t: Throwable, currentTime: LocalDateTime, direction: StepDirection) extends CronOpError(t) | |
implicit val show: Show[CronOpError] = Show.shows[CronOpError]({ | |
case FailedCronDef(t) => s"Invalid cron defintion: ${t.getMessage} ${Option(t.getCause).getOrElse("")}" | |
case CronParseException(t) => s"Invalid cron string: ${t.getMessage} ${Option(t.getCause).getOrElse("")}" | |
case FailedExecutionTimeForCron(t) => s"Failed to find the execution time from cron: ${t.getMessage} ${Option(t.getCause).getOrElse("")}" | |
case StepException(t, d, dir) => | |
s"""Failed to step through the cron: ${t.getMessage} ${Option(t.getCause).getOrElse("")}. | |
The point where it failed was: ${d.toString}. | |
Step: ${dir.toString} | |
""" | |
}) | |
abstract sealed class StepDirection(val step: Int) { | |
override def toString = this match { | |
case Forward(n) => s"step forward by ${n}" | |
case Backward(n) => s"step backward by ${n}" | |
} | |
val nextTime: ExecutionTime => LocalDateTime => Optional[ZonedDateTime] = | |
a => b => a.nextExecution(ZonedDateTime.of(b, ZoneId.systemDefault())) | |
} | |
object StepDirection { | |
final case class Forward(n: Int) extends StepDirection(n) | |
final case class Backward(n: Int) extends StepDirection(n) | |
} | |
//////////////// | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment