-
-
Save viktorklang/566793 to your computer and use it in GitHub Desktop.
package my.concurrent.multimap | |
import scala.reflect.Manifest | |
import java.util.concurrent.{ConcurrentSkipListSet, ConcurrentHashMap} | |
import java.util.{Set => JSet} | |
import scala.collection.JavaConversions._ | |
import annotation.tailrec | |
class Index[K <: AnyRef,V <: AnyRef : Manifest] { | |
private val Naught = Array[V]() //Nil for Arrays | |
private val container = new ConcurrentHashMap[K, JSet[V]] | |
def put(key: K, value: V) { | |
//Returns whether it needs to be retried or not | |
def tryPut(set: JSet[V], v: V): Boolean = { | |
set.synchronized { //Synchronize on the set to avoid clashes with remove | |
if (!set.isEmpty) { //Set will be empty here if there's a pending remove (delete) | |
set add v | |
false | |
} else true | |
} | |
} | |
@tailrec def syncPut(k: K, v: V): Boolean = { | |
var retry = false | |
val set = container get k | |
if (set ne null) retry = tryPut(set,v) | |
else { | |
val newSet = new ConcurrentSkipListSet[V] | |
newSet add v | |
// Parry for two simultaneous putIfAbsent(id,newSet) | |
val oldSet = container.putIfAbsent(k,newSet) | |
if (oldSet ne null) | |
retry = tryPut(oldSet,v) | |
} | |
if (retry) syncPut(k,v) | |
else true | |
} | |
syncPut(key,value) | |
} | |
def values(key: K) = { | |
val set: JSet[V] = container get key | |
if (set ne null) set toArray Naught | |
else Naught | |
} | |
def foreach(key: K)(fun: (V) => Unit) { | |
val set = container get key | |
if (set ne null) | |
set foreach fun | |
} | |
def foreach(fun: (K,V) => Unit) { | |
container.entrySet foreach { | |
(e) => e.getValue.foreach(fun(e.getKey,_)) | |
} | |
} | |
def remove(key: K, value: V) { | |
val set = container get key | |
if (set ne null) { | |
set.synchronized { //Synchronize on set to avoid to confict with put | |
set remove value | |
if (set.isEmpty) | |
container remove key | |
} | |
} | |
} | |
def clear = container.clear | |
} |
class Index[K <: AnyRef,V <: AnyRef : Manifest] { | |
import scala.collection.JavaConversions._ | |
private val Naught = Array[V]() //Nil for Arrays | |
private val container = new ConcurrentHashMap[K, JSet[V]] | |
private val emptySet = new ConcurrentSkipListSet[V] | |
@tailrec protected final def spinPut(key: K, value: V, set: JSet[V]) { | |
set add value //Add the value to the set | |
val oldSet = container.putIfAbsent(key,set) //Add the set to the container (cancels out any removes) | |
if((oldSet ne null) && (oldSet ne set)) //If clash with other new add, first commit wins and the other tries again | |
spinPut(key,value,oldSet) | |
} | |
def put(key: K, value: V) { | |
val set = container get key | |
spinPut(key,value, if (set ne null) set else new ConcurrentSkipListSet[V]) | |
} | |
def values(key: K) = { | |
val set: JSet[V] = container get key | |
if (set ne null) set toArray Naught | |
else Naught | |
} | |
def foreach(key: K)(fun: (V) => Unit) { | |
val set = container get key | |
if (set ne null) | |
set foreach fun | |
} | |
def foreach(fun: (K,V) => Unit) { | |
container.entrySet foreach { | |
(e) => e.getValue.foreach(fun(e.getKey,_)) | |
} | |
} | |
def remove(key: K, value: V) { | |
val set = container get key | |
if ((set ne null) && (set remove value)) | |
container.remove(key,emptySet) | |
} | |
def clear = container.clear | |
} |
Yeah, that was what I was thinking of. But, I think this will leak if the set is drained between set.add(value) and container.putIfAbsent. The problem is that the set is "visible" for remove() between those two lines.
To clarify:
set add value
// a concurrent remove() here leaves an empty set in the container
container.putIfAbsent(key, set)
Which in your code is:
if (set.add(value)) //If the value was previously not added
// a concurrent remove() here leaves an empty set in the container
container.putIfAbsent(key,set) //Re-add the set to ensure it hasn't been removed
Regarding:
private val emptySet = new ConcurrentSkipListSet[V]
... container.remove(key,emptySet) //Avoids race between isEmpty and remove
That looks like a neat approach, but I'm afraid you'll have the same timing issues here as an add to set set might be interleaved between the equals()-call and the actual removal.
Got a hunch maybe resorting to immutable sets might have something to offer here...
Checked out the Javadoc and your approach will work:
--snip--
Removes the entry for a key only if currently mapped to a given value. This is equivalent to
if (map.containsKey(key) && map.get(key).equals(value)) {
map.remove(key);
return true;
} else return false;
except that the action is performed atomically.
--snip--
It uses a lock, but I don't know the performance impact of that compared to a synchronized block. :)
I re-wrote ConcurrentMultiMap2.scala, check it out, I think it's rather elegant actually.
Yeah, that's much more concise and readable, but can still put empty sets into the container... I think you have to add the synch lock again to avoid it. The new version will still be much more elegant than the original, though. :)
You mean if there's a race between
t1: set add value
t2: set remove value
t2: container.remove(key,emptySet)
t1: container.putIfAbsent(key,set)
?
Yes, between the last two lines. At Jonas' talk at Javazone right now, so I'm a little distracted and typing constrained.
Oh, you responded too quickly for me. Gonna look at Index2 now.