Skip to content

Instantly share code, notes, and snippets.

@arturaz
Created June 8, 2023 11:51
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save arturaz/a8da51748eb59c744c72f10aa3257ee2 to your computer and use it in GitHub Desktop.
Save arturaz/a8da51748eb59c744c72f10aa3257ee2 to your computer and use it in GitHub Desktop.
extension[F[_], A] (resource: Resource[F, A]) {
/**
* Returns a new resource that will be allocated only once and shared between all users.
*
* The resource is released when the last user is done with it.
*/
def memoizeShared(using F: Async[F], syncAsASync: SyncAsASync[F, F]): F[Resource[F, A]] = {
case class State(users: Int, resource: A, release: F[Unit])
for {
stateRef <- SynchronizedRef[F, F].of(Option.empty[State])
} yield Resource.make(stateRef.modify {
case None =>
// Allocate the resource.
for {
a <- resource.allocated
state = State(1, a._1, a._2)
} yield (Some(state), state.resource)
case Some(state) =>
// Register that we have a user.
val newState = state.modify(_.users).using(_ + 1)
F.pure((Some(newState), state.resource))
}.map(_._2)) { _ =>
stateRef.modify {
case None =>
// This should never happen.
F.raiseError(new IllegalStateException("Resource was released but it was never allocated."))
case Some(state) =>
val newState = state.modify(_.users).using(_ - 1)
if (newState.users == 0) {
// Release the resource.
newState.release.as((None, ()))
} else {
// Unregister that we have a user.
F.pure((Some(newState), ()))
}
}.map(_._2)
}
}
}
package app.data
import cats.effect.kernel.Resource
import cats.effect.std.Semaphore
import cats.effect.{Concurrent, IO, Ref, Sync, SyncIO}
import cats.implicits.{toFlatMapOps, toFunctorOps}
import cats.{Applicative, Functor, Monad, ~>}
/**
* [[Ref]] of [[A]] which relies on locking instead of compare-and-swap to provide concurrent access data safety.
*
* [[ASyncF]] is the type which is used by the lock-use-unlock operations. It is asynchronous as we might have to wait
* until the lock can be obtained.
*
* [[SyncF]] is the type which is used by the operations that do not obtain the lock first.
*/
trait SynchronizedRef[ASyncF[_], SyncF[_], A]
extends SynchronizedRefSync[SyncF, A] with SynchronizedRefASync[ASyncF, A]
{ self =>
/** Projects the [[SynchronizedRef]] in two ways, turning it from [[A]] to [[B]]. */
def bimap[B](mapper: A => B)(
contraMapper: (A, B) => A
)(implicit syncFunctor: Functor[SyncF]): SynchronizedRef[ASyncF, SyncF, B] = new SynchronizedRef[ASyncF, SyncF, B] {
override protected implicit def asyncConcurrent = self.asyncConcurrent
override lazy val resource: Resource[ASyncF, B] = self.resource.map(mapper)
override def getImmediate = self.getImmediate.map(mapper)
override def modify[Result](f: B => ASyncF[(B, Result)]) =
self
.modify(a => f(mapper(a)).map { case tpl @ (b, _) => (contraMapper(a, b), tpl)})
.map { case (_, tpl) => tpl }
override def modifyMaybeEffect[Result](f: B => ASyncF[Option[ASyncF[(B, Result)]]]) =
self
.modifyMaybeEffect { a =>
f(mapper(a)).map(opt => opt.map(asyncF => asyncF.map { case tpl @ (b, _) =>
(contraMapper(a, b), tpl)
}))
}
.map(opt => opt.map { case (_, tpl) => tpl })
}
}
/** Parts of [[SynchronizedRef]] which do not acquire the lock first. */
trait SynchronizedRefSync[SyncF[_], A] {
/** Fetches the data without acquiring the lock first. */
def getImmediate: SyncF[A]
}
/** Parts of [[SynchronizedRef]] which acquire the lock. */
trait SynchronizedRefASync[ASyncF[_], A] {
protected implicit def asyncConcurrent: Concurrent[ASyncF]
/** Acquires a lock, fetches the data, releases the lock. */
def get: ASyncF[A] = use(Applicative[ASyncF].pure)
/** Acquires a lock, fetches the data, runs the effect, releases the lock. */
def use[B](f: A => ASyncF[B]): ASyncF[B] = resource.use(f)
/**
* [[Resource]] that acquires a lock and fetches the data upon [[Resource.use]] and releases the lock once it is
* finished.
* */
lazy val resource: Resource[ASyncF, A]
/** Acquires a lock, fetches the data, invokes the provided function, executes the effect, writes back the data and
* releases the lock. */
def modify[Result](f: A => ASyncF[(A, Result)]): ASyncF[(A, Result)]
/** As [[modify]] but does not produce a result. */
def modify_(f: A => ASyncF[A]): ASyncF[A] = modify(a => f(a).map((_, ()))).map(_._1)
/**
* Like [[modify]], but can terminate early without setting back the value.
*
* Returns an optional value if the update completed.
*/
def modifyMaybe[Result](f: A => Option[ASyncF[(A, Result)]]): ASyncF[Option[(A, Result)]] =
modifyMaybeEffect(a => Applicative[ASyncF].pure(f(a)))
/** As [[modifyMaybe]] but does not produce a result. */
def modifyMaybe_(f: A => Option[ASyncF[A]]): ASyncF[Option[A]] =
modifyMaybe(a => f(a).map(asyncF => asyncF.map((_, ())))).map(opt => opt.map(_._1))
/** Like [[modifyMaybe]], but to produce the [[Option]] we need to run an effect first. */
def modifyMaybeEffect[Result](f: A => ASyncF[Option[ASyncF[(A, Result)]]]): ASyncF[Option[(A, Result)]]
/** As [[modifyMaybeEffect]] but does not produce a result. */
def modifyMaybeEffect_(f: A => ASyncF[Option[ASyncF[A]]]): ASyncF[Option[A]] =
modifyMaybeEffect(a => f(a).map(opt => opt.map(asyncF => asyncF.map((_, ()))))).map(opt => opt.map(_._1))
}
object SynchronizedRef {
/**
* Builds a [[SynchronizedRef]] value.
*
* This builder uses the
* [[https://typelevel.org/cats/guidelines.html#partially-applied-type Partially-Applied Type]]
* technique.
*
* {{{
* SynchronizedRef[IO, SyncIO].of(10) <-> SynchronizedRef.of[IO, SyncIO, Int](10)
* }}}
*
* @see [[of]]
*/
def apply[ASyncF[_], SyncF[_]](
implicit concurrent: Concurrent[ASyncF], sync: Sync[SyncF]
): ApplyBuilders[ASyncF, SyncF] =
new ApplyBuilders(concurrent, sync)
/**
* Converts synchronous type [[SyncF]] to it's asynchronous counterpart [[ASyncF]]. This is essentially [[~>]]
* but we give it a separate trait so that implicit resolution would pick it up automatically.
**/
trait SyncAsASync[SyncF[_], ASyncF[_]] extends (SyncF ~> ASyncF)
object SyncAsASync {
given SyncAsASync[IO, IO] = new SyncAsASync[IO, IO] {
override def apply[A](f: IO[A]) = f
}
given catsIO: SyncAsASync[SyncIO, IO] = new SyncAsASync[SyncIO, IO] {
override def apply[A](f: SyncIO[A]) = f.to[IO]
}
}
/** Creates a new instance of [[SynchronizedRef]]. */
def of[ASyncF[_] : Concurrent, SyncF[_] : Sync, A](
initial: A
)(implicit syncAsASync: SyncAsASync[SyncF, ASyncF]): ASyncF[SynchronizedRef[ASyncF, SyncF, A]] = {
for {
ref <- syncAsASync(Ref[SyncF].of(initial))
semaphore <- Semaphore[ASyncF](1)
} yield new SynchronizedRef[ASyncF, SyncF, A] {
override protected def asyncConcurrent = implicitly
//region SynchronizedRefSync
override def getImmediate = ref.get
//endregion
//region SynchronizedRefASync
override lazy val resource: Resource[ASyncF, A] =
semaphore.permit.evalMap(_ => syncAsASync(ref.get))
override def modify[Result](f: A => ASyncF[(A, Result)]) = {
semaphore.permit.use { _ =>
syncAsASync(ref.get).flatMap(a => f(a)).flatTap { case (a, _) => syncAsASync(ref.set(a)) }
}
}
override def modifyMaybeEffect[Result](f: A => ASyncF[Option[ASyncF[(A, Result)]]]) = {
semaphore.permit.use(_ => syncAsASync(ref.get).flatMap { a =>
f(a).flatMap {
case None => Monad[ASyncF].pure(None)
case Some(effect) => effect.flatMap { case orig @ (a, _) =>
syncAsASync(ref.set(a).map(_ => Some(orig)))
}
}
})
}
//endregion
}
}
final class ApplyBuilders[ASyncF[_], SyncF[_]](
concurrent: Concurrent[ASyncF], sync: Sync[SyncF]
) {
/** Creates a new instance of [[SynchronizedRef]]. */
def of[A](a: A)(implicit syncAsASync: SyncAsASync[SyncF, ASyncF]): ASyncF[SynchronizedRef[ASyncF, SyncF, A]] =
SynchronizedRef.of(a)(concurrent, sync, syncAsASync)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment