Created
June 28, 2017 19:23
Engineering thread safe updates (getAndSet) to a TrieMap with periodic flushing of its contents.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class SoldUnitsCache { | |
type Item = String | |
type Units = Long | |
val cache = TrieMap[Item, Units]() | |
val cacheLock = new ReentrantReadWriteLock() | |
def updateSoldUnits(item: Item, soldUnits: Units): Unit = { | |
// The readlock here ensures that multiple updates can happen concurrently | |
// while still blocking them when flusher wants to snapshot and clear the map. | |
cache.readLock { | |
cache.putIfAbsent(item, 0) | |
var oldValue = cache(item) | |
var newValue = oldValue + soldUnits | |
while (!cache.replace(item, oldValue, newValue) { | |
oldValue = cache(item) | |
newValue = oldValue + soldUnits | |
} | |
} | |
} | |
val flusher = new PeriodicTask(flush) | |
flusher.start() | |
def flush(): Unit = { | |
// The write lock here prevents any updates from happening while we snapshot | |
// and clear the cache. | |
cacheLock.writeLock { | |
val snapshot = cache.asMap | |
cache.clear | |
} | |
durablePersist(snapshot) | |
} | |
def getSoldUnits(item: Item): Long = { | |
// No need to acquire any lock | |
cache.get(item).getOrElse(0) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment