Skip to content

Instantly share code, notes, and snippets.

@djspiewak
Created July 23, 2018 17:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save djspiewak/f1368e7cac954283e78dffefb33f8b7e to your computer and use it in GitHub Desktop.
Save djspiewak/f1368e7cac954283e78dffefb33f8b7e to your computer and use it in GitHub Desktop.
/*
* Copyright 2014–2018 SlamData Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package quasar.impl.table
import slamdata.Predef._
import quasar.Condition
import quasar.api.QueryEvaluator
import cats.effect.{ConcurrentEffect, Effect}
import fs2.{async, Stream}
import fs2.async.mutable.Queue
// monad/traverse syntax conflict
import scalaz.{Equal, OptionT, Scalaz}, Scalaz._
import shims._
import scala.collection.JavaConverters._
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import java.time.OffsetDateTime
import java.util.concurrent.ConcurrentHashMap
class PreparationsManager[F[_]: Effect, I, Q, R] private (
evaluator: QueryEvaluator[F, Q, R],
notificationsQ: Queue[F, Option[PreparationsManager.TableNotification[I]]],
background: Stream[F, Nothing] => F[Unit])(
runToStore: (I, R) => F[Stream[F, Unit]])(
implicit ec: ExecutionContext) {
import PreparationsManager._
private val ongoing: ConcurrentHashMap[I, Preparation[F]] =
new ConcurrentHashMap[I, Preparation[F]]
val F = Effect[F]
val notifications = notificationsQ.dequeue.unNoneTerminate
// fake Equal for fake parametricity
def prepareTable(tableId: I, query: Q)(implicit I: Equal[I]): F[Unit] = {
for {
result <- evaluator.evaluate(query)
persist <- runToStore(tableId, result)
s <- async.signalOf[F, Boolean](false)
configured = Stream.eval(F.delay(OffsetDateTime.now())) flatMap { start =>
val preparation = Preparation(s.set(true), start)
val halted = persist.interruptWhen(s) onComplete {
val eff = for {
canceled <- s.get
_ <- if (canceled) {
().point[F]
} else {
for {
end <- F.delay(OffsetDateTime.now())
_ <- notificationsQ.enqueue1(
Some(
TableNotification.PreparationSucceeded(
tableId,
start,
(end.toEpochSecond - start.toEpochSecond).millis)))
} yield ()
}
} yield ()
Stream.eval_(eff)
}
val handled = halted handleErrorWith { t =>
val eff = for {
end <- F.delay(OffsetDateTime.now())
_ <- notificationsQ.enqueue1(
Some(
TableNotification.PreparationErrored(
tableId,
start,
(end.toEpochSecond - start.toEpochSecond).millis,
t)))
} yield ()
Stream.eval_(eff)
}
Stream.bracket(
F.delay(ongoing.put(tableId, preparation)))(
_ => handled,
_ => F.delay(ongoing.remove(tableId, preparation)).as(()))
}
_ <- background(configured.drain)
} yield ()
}
def preparationStatus(tableId: I)(implicit I: Equal[I]): F[Option[OffsetDateTime]] =
F.delay(Option(ongoing.get(tableId)).map(_.start))
def cancelPreparation(tableId: I): F[Condition[NotInProgressError[I]]] = {
val eff = for {
live <- OptionT(F.delay(Option(ongoing.get(tableId))))
removed <- F.delay(ongoing.remove(tableId, live)).liftM[OptionT]
_ <- if (removed)
live.cancel.liftM[OptionT]
else
().point[OptionT[F, ?]]
} yield ()
eff.run map {
case Some(_) => Condition.Normal()
case None => Condition.Abnormal(NotInProgressError(tableId))
}
}
@SuppressWarnings(Array("org.wartremover.warts.Recursion"))
def cancelAll: F[Unit] = {
for {
keys <- F.delay(ongoing.keys.asScala.toList)
results <- keys.traverse(cancelPreparation)
_ <- if (results.isEmpty)
().point[F]
else
cancelAll // handle race condition of new enqueue
} yield ()
}
}
object PreparationsManager {
@SuppressWarnings(Array("org.wartremover.warts.DefaultArguments"))
def apply[F[_]: ConcurrentEffect, I, Q, R](
evaluator: QueryEvaluator[F, Q, R],
maxStreams: Int = 10,
maxNotifications: Int = 10)(
runToStore: (I, R) => F[Stream[F, Unit]])(
implicit ec: ExecutionContext): Stream[F, PreparationsManager[F, I, Q, R]] = {
for {
q <-
Stream.eval(async.boundedQueue[F, Stream[F, Nothing]](maxStreams))
notificationsQ <-
Stream.eval(async.boundedQueue[F, Option[TableNotification[I]]](maxNotifications))
emit = Stream(new PreparationsManager[F, I, Q, R](evaluator, notificationsQ, q.enqueue1(_))(runToStore))
// we have to be explicit here because scalaz's MonadSyntax includes .join
back <- emit.concurrently(Stream.InvariantOps(q.dequeue).join(maxStreams)) onComplete {
Stream.eval_(notificationsQ.enqueue1(None))
}
} yield back
}
private final case class Preparation[F[_]](
cancel: F[Unit],
start: OffsetDateTime)
final case class InProgressError[I](tableId: I)
final case class NotInProgressError[I](tableId: I)
sealed trait TableNotification[I] extends Product with Serializable
object TableNotification {
final case class PreparationErrored[I](
tableId: I,
start: OffsetDateTime,
duration: Duration,
t: Throwable) extends TableNotification[I]
final case class PreparationSucceeded[I](
tableId: I,
start: OffsetDateTime,
duration: Duration) extends TableNotification[I]
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment