Created
July 23, 2018 17:45
-
-
Save djspiewak/f1368e7cac954283e78dffefb33f8b7e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright 2014–2018 SlamData Inc. | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package quasar.impl.table | |
import slamdata.Predef._ | |
import quasar.Condition | |
import quasar.api.QueryEvaluator | |
import cats.effect.{ConcurrentEffect, Effect} | |
import fs2.{async, Stream} | |
import fs2.async.mutable.Queue | |
// monad/traverse syntax conflict | |
import scalaz.{Equal, OptionT, Scalaz}, Scalaz._ | |
import shims._ | |
import scala.collection.JavaConverters._ | |
import scala.concurrent.ExecutionContext | |
import scala.concurrent.duration._ | |
import java.time.OffsetDateTime | |
import java.util.concurrent.ConcurrentHashMap | |
class PreparationsManager[F[_]: Effect, I, Q, R] private ( | |
evaluator: QueryEvaluator[F, Q, R], | |
notificationsQ: Queue[F, Option[PreparationsManager.TableNotification[I]]], | |
background: Stream[F, Nothing] => F[Unit])( | |
runToStore: (I, R) => F[Stream[F, Unit]])( | |
implicit ec: ExecutionContext) { | |
import PreparationsManager._ | |
private val ongoing: ConcurrentHashMap[I, Preparation[F]] = | |
new ConcurrentHashMap[I, Preparation[F]] | |
val F = Effect[F] | |
val notifications = notificationsQ.dequeue.unNoneTerminate | |
// fake Equal for fake parametricity | |
def prepareTable(tableId: I, query: Q)(implicit I: Equal[I]): F[Unit] = { | |
for { | |
result <- evaluator.evaluate(query) | |
persist <- runToStore(tableId, result) | |
s <- async.signalOf[F, Boolean](false) | |
configured = Stream.eval(F.delay(OffsetDateTime.now())) flatMap { start => | |
val preparation = Preparation(s.set(true), start) | |
val halted = persist.interruptWhen(s) onComplete { | |
val eff = for { | |
canceled <- s.get | |
_ <- if (canceled) { | |
().point[F] | |
} else { | |
for { | |
end <- F.delay(OffsetDateTime.now()) | |
_ <- notificationsQ.enqueue1( | |
Some( | |
TableNotification.PreparationSucceeded( | |
tableId, | |
start, | |
(end.toEpochSecond - start.toEpochSecond).millis))) | |
} yield () | |
} | |
} yield () | |
Stream.eval_(eff) | |
} | |
val handled = halted handleErrorWith { t => | |
val eff = for { | |
end <- F.delay(OffsetDateTime.now()) | |
_ <- notificationsQ.enqueue1( | |
Some( | |
TableNotification.PreparationErrored( | |
tableId, | |
start, | |
(end.toEpochSecond - start.toEpochSecond).millis, | |
t))) | |
} yield () | |
Stream.eval_(eff) | |
} | |
Stream.bracket( | |
F.delay(ongoing.put(tableId, preparation)))( | |
_ => handled, | |
_ => F.delay(ongoing.remove(tableId, preparation)).as(())) | |
} | |
_ <- background(configured.drain) | |
} yield () | |
} | |
def preparationStatus(tableId: I)(implicit I: Equal[I]): F[Option[OffsetDateTime]] = | |
F.delay(Option(ongoing.get(tableId)).map(_.start)) | |
def cancelPreparation(tableId: I): F[Condition[NotInProgressError[I]]] = { | |
val eff = for { | |
live <- OptionT(F.delay(Option(ongoing.get(tableId)))) | |
removed <- F.delay(ongoing.remove(tableId, live)).liftM[OptionT] | |
_ <- if (removed) | |
live.cancel.liftM[OptionT] | |
else | |
().point[OptionT[F, ?]] | |
} yield () | |
eff.run map { | |
case Some(_) => Condition.Normal() | |
case None => Condition.Abnormal(NotInProgressError(tableId)) | |
} | |
} | |
@SuppressWarnings(Array("org.wartremover.warts.Recursion")) | |
def cancelAll: F[Unit] = { | |
for { | |
keys <- F.delay(ongoing.keys.asScala.toList) | |
results <- keys.traverse(cancelPreparation) | |
_ <- if (results.isEmpty) | |
().point[F] | |
else | |
cancelAll // handle race condition of new enqueue | |
} yield () | |
} | |
} | |
object PreparationsManager { | |
@SuppressWarnings(Array("org.wartremover.warts.DefaultArguments")) | |
def apply[F[_]: ConcurrentEffect, I, Q, R]( | |
evaluator: QueryEvaluator[F, Q, R], | |
maxStreams: Int = 10, | |
maxNotifications: Int = 10)( | |
runToStore: (I, R) => F[Stream[F, Unit]])( | |
implicit ec: ExecutionContext): Stream[F, PreparationsManager[F, I, Q, R]] = { | |
for { | |
q <- | |
Stream.eval(async.boundedQueue[F, Stream[F, Nothing]](maxStreams)) | |
notificationsQ <- | |
Stream.eval(async.boundedQueue[F, Option[TableNotification[I]]](maxNotifications)) | |
emit = Stream(new PreparationsManager[F, I, Q, R](evaluator, notificationsQ, q.enqueue1(_))(runToStore)) | |
// we have to be explicit here because scalaz's MonadSyntax includes .join | |
back <- emit.concurrently(Stream.InvariantOps(q.dequeue).join(maxStreams)) onComplete { | |
Stream.eval_(notificationsQ.enqueue1(None)) | |
} | |
} yield back | |
} | |
private final case class Preparation[F[_]]( | |
cancel: F[Unit], | |
start: OffsetDateTime) | |
final case class InProgressError[I](tableId: I) | |
final case class NotInProgressError[I](tableId: I) | |
sealed trait TableNotification[I] extends Product with Serializable | |
object TableNotification { | |
final case class PreparationErrored[I]( | |
tableId: I, | |
start: OffsetDateTime, | |
duration: Duration, | |
t: Throwable) extends TableNotification[I] | |
final case class PreparationSucceeded[I]( | |
tableId: I, | |
start: OffsetDateTime, | |
duration: Duration) extends TableNotification[I] | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment