Skip to content

Instantly share code, notes, and snippets.

@Ghurtchu
Last active April 2, 2023 22:56
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Ghurtchu/2f1d1465271a302643cd87762b3b8e7d to your computer and use it in GitHub Desktop.
Save Ghurtchu/2f1d1465271a302643cd87762b3b8e7d to your computer and use it in GitHub Desktop.
In-Memory Concurrent Key-Value Cache
object ConcurrentInMemoryCacheImpl {
trait Cache[K, V] {
type F
def put(k: K, v: V): Unit
def get(k: K): Option[V]
def remove(k: K): Option[V]
def update(k: K, v: V): Unit
def value: F
}
object Cache {
import scala.collection.concurrent.TrieMap
def apply[K, V]: Cache[K, V] = new Cache[K, V] {
override type F = TrieMap[K, V]
private val mapRef = Ref(TrieMap.empty[K, V])
override def put(k: K, v: V): Unit = mapRef.modify { map =>
map.put(k, v)
(map, ())
}
override def get(k: K): Option[V] = mapRef.modify { map => (map, map.get(k)) }
override def remove(k: K): Option[V] = mapRef.modify { map =>
val removed = map.remove(k)
(map, removed)
}
override def update(k: K, v: V): Unit = mapRef.modify { map =>
map.update(k, v)
(map, ())
}
override def value: TrieMap[K, V] = mapRef.get
}
trait Atomically[A] {
def get: A
def modify[B](f: A => (A, B)): B
}
import java.util.concurrent.atomic.AtomicReference
final class Ref[A](ar: AtomicReference[A]) extends Atomically[A] {
override def modify[B](f: A => (A, B)): B = {
@scala.annotation.tailrec
def spin(): B = {
val oldState = get
val (newState, b) = f(oldState)
if (ar.compareAndSet(oldState, newState)) b else spin()
}
spin()
}
override def get: A = ar.get
}
object Ref {
def apply[A](a: => A): Ref[A] = new Ref[A](new AtomicReference[A](a))
}
}
}
@Ghurtchu
Copy link
Author

Ghurtchu commented Apr 2, 2023

Let's test the atomicity of Ref and run 10 concurrent threads, where:

  • each thread starts 20 children threads in total
  • 10 of which try to increment Int inside Ref 10,000 times
  • and another 10 try to decrement Int inside Ref 10,000 times

The expected output should be Ref(0):

def runThreads[Task](amount: Int, task: => Task): Unit =
  List.fill(amount)(new Thread(() => (0 to 10_000).foreach(_ => task)))
    .foreach(_.start)

def concurrently[Task](task: => Task): Unit =
  List.fill(10)(new Thread(() => task))
    .foreach(_.start)
       
concurrently {
  val ref = Ref(0)

  runThreads(amount = 10, task = ref.modify(n => (n + 1, n)))
  runThreads(amount = 10, task = ref.modify(n => (n - 1, n)))

  Thread sleep 3000

  println(ref.get)
}

Output: 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 - passed

@Ghurtchu
Copy link
Author

Ghurtchu commented Apr 2, 2023

Let's try the same on simple mutable @volatile variable:

concurrently {
  @volatile var num = 0

  runThreads(amount = 10, task = num += 1)
  runThreads(amount = 10, task = num -= 1)

  Thread sleep 3000

  println(num)
}

Output: -15173, -4728, 8123, 12421, 5766, 7568, -904, -34940, 1254, 200 - didn't pass even one time.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment