Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
A wrapper on ConcurrentHashMap to use with cats.effect.Ref
package org.bykn.refmap
import cats.data.State
import cats.effect.Sync
import cats.effect.concurrent.Ref
import java.util.concurrent.ConcurrentHashMap
import cats.implicits._
/**
* This is a total Map from K to Ref[F, V].
* this allows us to use the Ref api backed by a ConcurrentHashMap
*
* This uses java universal hashCode and equality on K
*/
trait RefMap[F[_], K, V] {
def apply(k: K): Ref[F, V]
}
object RefMap {
private class Impl[F[_], K, V](chm: ConcurrentHashMap[K, V], sync: Sync[F])
extends RefMap[F, K, Option[V]] {
private implicit def syncF: Sync[F] = sync
val fnone0: F[None.type] = sync.pure(None)
def fnone[A]: F[Option[A]] = fnone0.widen[Option[A]]
class HandleRef(k: K) extends Ref[F, Option[V]] {
def access: F[(Option[V], Option[V] => F[Boolean])] =
sync.delay {
val init = chm.get(k)
if (init == null) {
val set: Option[V] => F[Boolean] = { opt: Option[V] =>
opt match {
case None => sync.delay(!chm.containsKey(k))
case Some(newV) =>
sync.delay {
// it was initially empty
chm.putIfAbsent(k, newV) == null
}
}
}
(None, set)
} else {
val set: Option[V] => F[Boolean] = { opt: Option[V] =>
opt match {
case None =>
sync.delay(chm.remove(k, init))
case Some(newV) =>
sync.delay(chm.replace(k, init, newV))
}
}
(Some(init), set)
}
}
def get: F[Option[V]] =
sync.delay {
Option(chm.get(k))
}
def getAndSet(a: Option[V]): F[Option[V]] =
a match {
case None =>
sync.delay(Option(chm.remove(k)))
case Some(v) =>
sync.delay(Option(chm.put(k, v)))
}
def modify[B](f: Option[V] => (Option[V], B)): F[B] = {
lazy val loop: F[B] = tryModify(f).flatMap {
case None => loop
case Some(b) => sync.pure(b)
}
loop
}
def modifyState[B](state: State[Option[V], B]): F[B] =
modify(state.run(_).value)
def set(a: Option[V]): F[Unit] =
a match {
case None => sync.delay { chm.remove(k); () }
case Some(v) => sync.delay { chm.put(k, v); () }
}
def tryModify[B](f: Option[V] => (Option[V], B)): F[Option[B]] =
// we need the suspend because we do effects inside
sync.suspend {
val init = chm.get(k)
if (init == null) {
f(None) match {
case (None, b) =>
// no-op
sync.pure(Some(b))
case (Some(newV), b) =>
if (chm.putIfAbsent(k, newV) == null) sync.pure(Some(b))
else fnone
}
} else {
f(Some(init)) match {
case (None, b) =>
if (chm.remove(k, init)) sync.pure(Some(b))
else fnone
case (Some(next), b) =>
if (chm.replace(k, init, next)) sync.pure(Some(b))
else fnone
}
}
}
def tryModifyState[B](state: State[Option[V], B]): F[Option[B]] =
tryModify(state.run(_).value)
def tryUpdate(f: Option[V] => Option[V]): F[Boolean] =
tryModify { opt =>
(f(opt), ())
}.map(_.isDefined)
def update(f: Option[V] => Option[V]): F[Unit] = {
lazy val loop: F[Unit] = tryUpdate(f).flatMap {
case true => sync.unit
case false => loop
}
loop
}
}
def apply(k: K): Ref[F, Option[V]] = new HandleRef(k)
}
/**
* This allocates mutable memory, so it has to be inside F. The way to use things like this is to
* allocate one then `.map` them inside of constructors that need to access them.
*
* It is usually a mistake to have a `F[RefMap[F, K, V]]` field. You want `RefMap[F, K, V]` field
* which means the thing that needs it will also have to be inside of `F[_]`, which is because
* it needs access to mutable state so allocating it is also an effect.
*/
def fromConcurrentHashMap[F[_]: Sync, K, V](
initialCapacity: Int = 16,
loadFactor: Float = 0.75f,
concurrencyLevel: Int = 16): F[RefMap[F, K, Option[V]]] =
Sync[F].delay(
new Impl[F, K, V](new ConcurrentHashMap[K, V](initialCapacity, loadFactor, concurrencyLevel),
Sync[F]))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment