Skip to content

Instantly share code, notes, and snippets.

Created June 28, 2017 19:23
Engineering thread safe updates (getAndSet) to a TrieMap with periodic flushing of its contents.
class SoldUnitsCache {
type Item = String
type Units = Long
val cache = TrieMap[Item, Units]()
val cacheLock = new ReentrantReadWriteLock()
def updateSoldUnits(item: Item, soldUnits: Units): Unit = {
// The readlock here ensures that multiple updates can happen concurrently
// while still blocking them when flusher wants to snapshot and clear the map.
cache.readLock {
cache.putIfAbsent(item, 0)
var oldValue = cache(item)
var newValue = oldValue + soldUnits
while (!cache.replace(item, oldValue, newValue) {
oldValue = cache(item)
newValue = oldValue + soldUnits
}
}
}
val flusher = new PeriodicTask(flush)
flusher.start()
def flush(): Unit = {
// The write lock here prevents any updates from happening while we snapshot
// and clear the cache.
cacheLock.writeLock {
val snapshot = cache.asMap
cache.clear
}
durablePersist(snapshot)
}
def getSoldUnits(item: Item): Long = {
// No need to acquire any lock
cache.get(item).getOrElse(0)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment