public
Last active

  • Download Gist
ConcurrentMultiMap.scala
Scala
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
package my.concurrent.multimap
 
import scala.reflect.Manifest
 
import java.util.concurrent.{ConcurrentSkipListSet, ConcurrentHashMap}
import java.util.{Set => JSet}
import scala.collection.JavaConversions._
import annotation.tailrec
 
class Index[K <: AnyRef,V <: AnyRef : Manifest] {
private val Naught = Array[V]() //Nil for Arrays
private val container = new ConcurrentHashMap[K, JSet[V]]
 
def put(key: K, value: V) {
 
//Returns whether it needs to be retried or not
def tryPut(set: JSet[V], v: V): Boolean = {
set.synchronized { //Synchronize on the set to avoid clashes with remove
if (!set.isEmpty) { //Set will be empty here if there's a pending remove (delete)
set add v
false
} else true
}
}
 
@tailrec def syncPut(k: K, v: V): Boolean = {
var retry = false
val set = container get k
if (set ne null) retry = tryPut(set,v)
else {
val newSet = new ConcurrentSkipListSet[V]
newSet add v
 
// Parry for two simultaneous putIfAbsent(id,newSet)
val oldSet = container.putIfAbsent(k,newSet)
if (oldSet ne null)
retry = tryPut(oldSet,v)
}
 
if (retry) syncPut(k,v)
else true
}
 
syncPut(key,value)
}
 
def values(key: K) = {
val set: JSet[V] = container get key
if (set ne null) set toArray Naught
else Naught
}
 
def foreach(key: K)(fun: (V) => Unit) {
val set = container get key
if (set ne null)
set foreach fun
}
 
def foreach(fun: (K,V) => Unit) {
container.entrySet foreach {
(e) => e.getValue.foreach(fun(e.getKey,_))
}
}
 
def remove(key: K, value: V) {
val set = container get key
if (set ne null) {
set.synchronized { //Synchronize on set to avoid to confict with put
set remove value
if (set.isEmpty)
container remove key
}
}
}
 
def clear = container.clear
}
ConcurrentMultiMap2.scala
Scala
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
class Index[K <: AnyRef,V <: AnyRef : Manifest] {
import scala.collection.JavaConversions._
 
private val Naught = Array[V]() //Nil for Arrays
private val container = new ConcurrentHashMap[K, JSet[V]]
private val emptySet = new ConcurrentSkipListSet[V]
 
@tailrec protected final def spinPut(key: K, value: V, set: JSet[V]) {
set add value //Add the value to the set
val oldSet = container.putIfAbsent(key,set) //Add the set to the container (cancels out any removes)
if((oldSet ne null) && (oldSet ne set)) //If clash with other new add, first commit wins and the other tries again
spinPut(key,value,oldSet)
}
def put(key: K, value: V) {
val set = container get key
spinPut(key,value, if (set ne null) set else new ConcurrentSkipListSet[V])
}
 
def values(key: K) = {
val set: JSet[V] = container get key
if (set ne null) set toArray Naught
else Naught
}
 
def foreach(key: K)(fun: (V) => Unit) {
val set = container get key
if (set ne null)
set foreach fun
}
 
def foreach(fun: (K,V) => Unit) {
container.entrySet foreach {
(e) => e.getValue.foreach(fun(e.getKey,_))
}
}
 
def remove(key: K, value: V) {
val set = container get key
if ((set ne null) && (set remove value))
container.remove(key,emptySet)
}
 
def clear = container.clear
}

Main objection is a lack of clarity / intent:

  • The !set.isEmtpy check relies on the not-so-obvious fact that an empty set only will exist in the container if it's about to be removed by another thread. This should be commented to improve readability.
  • When reading tryPut() alone, the synchronization on the set seems unnecessary and it's use does not become apparent before looking at remove(). I'd move the methods next to each other and comment on the relationship between the synchronization blocks.
  • The return type of put() is not easy to spot. The usual Map.put() return type doesn't make sense, but perhaps a boolean like Set.add() would be useful?

For performance reasons, it would be nice if the synchronized blocks could be avoided - at least for all puts and removes. A quick win in remove() would be:

set remove value
if (set.isEmpty) set.synchronized {
    if (set.isEmpty) container remove key
}

If the typical usage pattern of the Map is having a lot of values for the same key, it might be worth benchmarking a version which does not synchronize on the sets but instead always run a container.putIfAbsent() in put() so it simply overrides any concurrent calls to container.remove(). I also think the code could be greatly simplified using this approach... I'll check it out and do a micro-benchmark. :)

Very good points, looking forward to see what you come up with :-)

Daggerrz: Check Index2 (ConcurrentMultimap2.scala) Is that along the lines of what you were proposing?
I'm thinking about removing set.isEmpty in remove and do a conditional remove on container instead with a fixed empty set:
private val emptySet = new ConcurrentSkipListSet[V]
...
container.remove(key,emptySet) //Avoids race between isEmpty and remove

WYDT?

The last suggestion ended up being much trickier than I thought to implement. The code reads easier, but I ended up having to synchronize on the container which obviously is worse performance-wise.

Oh, you responded too quickly for me. Gonna look at Index2 now.

Yeah, that was what I was thinking of. But, I think this will leak if the set is drained between set.add(value) and container.putIfAbsent. The problem is that the set is "visible" for remove() between those two lines.

To clarify:

set add value
// a concurrent remove() here leaves an empty set in the container
container.putIfAbsent(key, set)

Which in your code is:

 if (set.add(value))              //If the value was previously not added
    // a concurrent remove() here leaves an empty set in the container
    container.putIfAbsent(key,set) //Re-add the set to ensure it hasn't been removed

Regarding:
private val emptySet = new ConcurrentSkipListSet[V]
... container.remove(key,emptySet) //Avoids race between isEmpty and remove

That looks like a neat approach, but I'm afraid you'll have the same timing issues here as an add to set set might be interleaved between the equals()-call and the actual removal.

Got a hunch maybe resorting to immutable sets might have something to offer here...

Checked out the Javadoc and your approach will work:
--snip--
Removes the entry for a key only if currently mapped to a given value. This is equivalent to
if (map.containsKey(key) && map.get(key).equals(value)) {
map.remove(key);
return true;
} else return false;
except that the action is performed atomically.
--snip--

It uses a lock, but I don't know the performance impact of that compared to a synchronized block. :)

I re-wrote ConcurrentMultiMap2.scala, check it out, I think it's rather elegant actually.

Yeah, that's much more concise and readable, but can still put empty sets into the container... I think you have to add the synch lock again to avoid it. The new version will still be much more elegant than the original, though. :)

You mean if there's a race between

t1: set add value
t2: set remove value
t2: container.remove(key,emptySet)
t1: container.putIfAbsent(key,set)

?

Yes, between the last two lines. At Jonas' talk at Javazone right now, so I'm a little distracted and typing constrained.

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.