Skip to content

Instantly share code, notes, and snippets.

@vmarquez
Last active November 28, 2021 12:33
Show Gist options
  • Star 5 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vmarquez/5667098 to your computer and use it in GitHub Desktop.
Save vmarquez/5667098 to your computer and use it in GitHub Desktop.
A mini STM if you will. I've made a'Transactional' map that mutates in a referentially transparent way.
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.CountDownLatch
import scala.concurrent.Future
import scala.concurrent.ExecutionContext
import ExecutionContext.Implicits.global
object TxMapTest {
/*
* Example Usage
* We want to show two threads working with the same data source having both of their effects succeed
* without a possible data race that could occur when using a var with immutable map or a ConcurrentHahsMap
*/
def apply() {
case class User(name: String, money: Int)
val vmap = new TxStateMap[Int,User]()
val finishedLatch = new CountDownLatch(1)
val atomic =
for {
_ <- vmap.set(1, User("runT1ME", 5))
_ <- vmap.set(2, User("tpolecat", 10))
_ <- vmap.set(3, User("dibblego", 15))
} yield ()
vmap.commit(atomic)
val f1 =
Future {
val atomic2 =
for {
runT1ME <- vmap.get(1)
tpolecat <- vmap.get(2)
_ <- vmap.set(1,runT1ME.copy(money=runT1ME.money-2))
tv <- vmap.set(2, tpolecat.copy(money=tpolecat.money+2))
} yield(tv)
vmap.commit(atomic2)
}
val f2 =
Future {
val atomic3 =
for {
runT1ME <- vmap.get(1)
tpolecat <- vmap.get(2)
dibblego <- vmap.get(3)
_ <- vmap.set(1, runT1ME.copy(money=runT1ME.money-1))
_ <- vmap.set(2, tpolecat.copy(money=tpolecat.money-1))
dr <- vmap.set(3, dibblego.copy(money=dibblego.money-1))
} yield (dr)
vmap.commit(atomic3)
}
val result = Future.sequence(List(f1, f2)).map( a => a.sequence )
result.map(io => //this is a bit weird since we haven't rectified Future/IO yet
(for {
li <- io
frozen <- vmap.frozen
_ = println("changed values = " + li)
_ = println("frozen = " + frozen)
_ = finishedLatch.countDown()
} yield ()).unsafePerformIO )
finishedLatch.await()
//if vmap was an immutable map, we could possibly only see one set of actions
//if vmap was a mutable atomic map, we mgiht miss some of the actions of one while others succeed
}
}
class TxStateMap[K, V] {
private val aref = new AtomicReference(Map[K,V]())
private def txState(f: Map[K,V]=>(Map[K,V],V)): State[Map[K,V], V] = State(f)
def get(k: K) = txState((m:Map[K,V]) => (m, m(k)))
def set(k: K, v:V) = txState((m:Map[K,V]) => (m + (k->v), v))
def commit[A](s: State[Map[K,V], A]): IO[A] = {
val m = aref.get()
val result = s(m)
if (!aref.compareAndSet(m, result._1))
commit(s)
else
IO { result._2 }
}
def frozen = IO { aref.get }
}
/*
For reference, State is more or less this:
case class State[A, B](run: A => (A,B)) {
def map[C](f: B=>C): State[A,C] = State[A, C]( ina => {
this.run(ina) match {
case (a, b) => (a, f(b))
}
})
def flatMap[C](f: B=>State[A,C]) = State[A, C]( ina => {
this.run(ina) match {
case (a, b) => f(b).run(a)
}
})
}
*/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment