Skip to content

Instantly share code, notes, and snippets.

@viktorklang
Created September 6, 2010 08:27
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save viktorklang/566793 to your computer and use it in GitHub Desktop.
Save viktorklang/566793 to your computer and use it in GitHub Desktop.
package my.concurrent.multimap
import scala.reflect.Manifest
import java.util.concurrent.{ConcurrentSkipListSet, ConcurrentHashMap}
import java.util.{Set => JSet}
import scala.collection.JavaConversions._
import annotation.tailrec
class Index[K <: AnyRef,V <: AnyRef : Manifest] {
private val Naught = Array[V]() //Nil for Arrays
private val container = new ConcurrentHashMap[K, JSet[V]]
def put(key: K, value: V) {
//Returns whether it needs to be retried or not
def tryPut(set: JSet[V], v: V): Boolean = {
set.synchronized { //Synchronize on the set to avoid clashes with remove
if (!set.isEmpty) { //Set will be empty here if there's a pending remove (delete)
set add v
false
} else true
}
}
@tailrec def syncPut(k: K, v: V): Boolean = {
var retry = false
val set = container get k
if (set ne null) retry = tryPut(set,v)
else {
val newSet = new ConcurrentSkipListSet[V]
newSet add v
// Parry for two simultaneous putIfAbsent(id,newSet)
val oldSet = container.putIfAbsent(k,newSet)
if (oldSet ne null)
retry = tryPut(oldSet,v)
}
if (retry) syncPut(k,v)
else true
}
syncPut(key,value)
}
def values(key: K) = {
val set: JSet[V] = container get key
if (set ne null) set toArray Naught
else Naught
}
def foreach(key: K)(fun: (V) => Unit) {
val set = container get key
if (set ne null)
set foreach fun
}
def foreach(fun: (K,V) => Unit) {
container.entrySet foreach {
(e) => e.getValue.foreach(fun(e.getKey,_))
}
}
def remove(key: K, value: V) {
val set = container get key
if (set ne null) {
set.synchronized { //Synchronize on set to avoid to confict with put
set remove value
if (set.isEmpty)
container remove key
}
}
}
def clear = container.clear
}
class Index[K <: AnyRef,V <: AnyRef : Manifest] {
import scala.collection.JavaConversions._
private val Naught = Array[V]() //Nil for Arrays
private val container = new ConcurrentHashMap[K, JSet[V]]
private val emptySet = new ConcurrentSkipListSet[V]
@tailrec protected final def spinPut(key: K, value: V, set: JSet[V]) {
set add value //Add the value to the set
val oldSet = container.putIfAbsent(key,set) //Add the set to the container (cancels out any removes)
if((oldSet ne null) && (oldSet ne set)) //If clash with other new add, first commit wins and the other tries again
spinPut(key,value,oldSet)
}
def put(key: K, value: V) {
val set = container get key
spinPut(key,value, if (set ne null) set else new ConcurrentSkipListSet[V])
}
def values(key: K) = {
val set: JSet[V] = container get key
if (set ne null) set toArray Naught
else Naught
}
def foreach(key: K)(fun: (V) => Unit) {
val set = container get key
if (set ne null)
set foreach fun
}
def foreach(fun: (K,V) => Unit) {
container.entrySet foreach {
(e) => e.getValue.foreach(fun(e.getKey,_))
}
}
def remove(key: K, value: V) {
val set = container get key
if ((set ne null) && (set remove value))
container.remove(key,emptySet)
}
def clear = container.clear
}
@daggerrz
Copy link

daggerrz commented Sep 7, 2010

Main objection is a lack of clarity / intent:

  • The !set.isEmtpy check relies on the not-so-obvious fact that an empty set only will exist in the container if it's about to be removed by another thread. This should be commented to improve readability.
  • When reading tryPut() alone, the synchronization on the set seems unnecessary and it's use does not become apparent before looking at remove(). I'd move the methods next to each other and comment on the relationship between the synchronization blocks.
  • The return type of put() is not easy to spot. The usual Map.put() return type doesn't make sense, but perhaps a boolean like Set.add() would be useful?

For performance reasons, it would be nice if the synchronized blocks could be avoided - at least for all puts and removes. A quick win in remove() would be:

set remove value
if (set.isEmpty) set.synchronized {
    if (set.isEmpty) container remove key
}

If the typical usage pattern of the Map is having a lot of values for the same key, it might be worth benchmarking a version which does not synchronize on the sets but instead always run a container.putIfAbsent() in put() so it simply overrides any concurrent calls to container.remove(). I also think the code could be greatly simplified using this approach... I'll check it out and do a micro-benchmark. :)

@viktorklang
Copy link
Author

Very good points, looking forward to see what you come up with :-)

@viktorklang
Copy link
Author

Daggerrz: Check Index2 (ConcurrentMultimap2.scala) Is that along the lines of what you were proposing?
I'm thinking about removing set.isEmpty in remove and do a conditional remove on container instead with a fixed empty set:
private val emptySet = new ConcurrentSkipListSet[V]
...
container.remove(key,emptySet) //Avoids race between isEmpty and remove

WYDT?

@daggerrz
Copy link

daggerrz commented Sep 7, 2010

The last suggestion ended up being much trickier than I thought to implement. The code reads easier, but I ended up having to synchronize on the container which obviously is worse performance-wise.

@daggerrz
Copy link

daggerrz commented Sep 7, 2010

Oh, you responded too quickly for me. Gonna look at Index2 now.

@daggerrz
Copy link

daggerrz commented Sep 7, 2010

Yeah, that was what I was thinking of. But, I think this will leak if the set is drained between set.add(value) and container.putIfAbsent. The problem is that the set is "visible" for remove() between those two lines.

@daggerrz
Copy link

daggerrz commented Sep 7, 2010

To clarify:

set add value
// a concurrent remove() here leaves an empty set in the container
container.putIfAbsent(key, set)

@daggerrz
Copy link

daggerrz commented Sep 7, 2010

Which in your code is:

 if (set.add(value))              //If the value was previously not added
    // a concurrent remove() here leaves an empty set in the container
    container.putIfAbsent(key,set) //Re-add the set to ensure it hasn't been removed

@daggerrz
Copy link

daggerrz commented Sep 7, 2010

Regarding:
private val emptySet = new ConcurrentSkipListSet[V]
... container.remove(key,emptySet) //Avoids race between isEmpty and remove

That looks like a neat approach, but I'm afraid you'll have the same timing issues here as an add to set set might be interleaved between the equals()-call and the actual removal.

Got a hunch maybe resorting to immutable sets might have something to offer here...

@daggerrz
Copy link

daggerrz commented Sep 7, 2010

Checked out the Javadoc and your approach will work:
--snip--
Removes the entry for a key only if currently mapped to a given value. This is equivalent to
if (map.containsKey(key) && map.get(key).equals(value)) {
map.remove(key);
return true;
} else return false;
except that the action is performed atomically.
--snip--

It uses a lock, but I don't know the performance impact of that compared to a synchronized block. :)

@viktorklang
Copy link
Author

I re-wrote ConcurrentMultiMap2.scala, check it out, I think it's rather elegant actually.

@daggerrz
Copy link

daggerrz commented Sep 8, 2010

Yeah, that's much more concise and readable, but can still put empty sets into the container... I think you have to add the synch lock again to avoid it. The new version will still be much more elegant than the original, though. :)

@viktorklang
Copy link
Author

You mean if there's a race between

t1: set add value
t2: set remove value
t2: container.remove(key,emptySet)
t1: container.putIfAbsent(key,set)

?

@daggerrz
Copy link

daggerrz commented Sep 8, 2010

Yes, between the last two lines. At Jonas' talk at Javazone right now, so I'm a little distracted and typing constrained.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment