Skip to content

Instantly share code, notes, and snippets.

@cscotta
Created June 30, 2012 19:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cscotta/3025111 to your computer and use it in GitHub Desktop.
Save cscotta/3025111 to your computer and use it in GitHub Desktop.
scala.concurrent.forkjoin.LinkedTransferQueue hazard in .size()
Hello,
There appears to be an issue with the version of the JSR166 LinkedTransferQueue
that ships with Scala 2.9.2 (also confirmed under 2.9.1) that can result in
calls to LTQ.size() spinning infinitely and failing to return. This issue does
not occur when using the latest version of LinkedTransferQueue in JSR166y.
We've run into this issue in production at Boundary a few times and finally sat
down to reproduce it. Fortunately, it is very easy to trigger and has been fixed
upstream in the JSR166 project.
Steps to reproduce in Scala 2.9.2:
1. Run the "Repro.scala" sample program with: scala Repro.scala
2. Observe output similar to bug-reproduced.txt (below)
3. Notice that calls to .size() stop increasing
Steps to demonstrate success with the latest version of JSR166:
1. Fetch the latest JSR166 at http://gee.cs.oswego.edu/dl/jsr166/dist/jsr166y.jar
2. Replace "import scala.concurrent.forkjoin.LinkedTransferQueue" with
"import jsr166y.LinkedTransferQueue" in Repro.scala below.
3. Run the "Repo.scala" sample program with: scala -cp jsr166y.jar Repro.scala
4. Observe output similar to bug-resolved.txt (below)
I would recommend upgrading this package (or at least LinkedTransferQueue) to
the latest available release of JSR166.
Thanks,
Scott Andreas
s@boundary.com
cscotta@ordasity.local:~/Desktop$ scala Repro.scala
Offers: 1406. Takes: 1361. Size: 65. Sizes: 30
Offers: 3504869. Takes: 3504076. Size: 18. Sizes: 4015
Offers: 7134683. Takes: 7134748. Size: 18. Sizes: 4015
Last size() operation completed 1983ms ago. Triggered the bug!
Offers: 11026499. Takes: 11026758. Size: 18. Sizes: 4015
Last size() operation completed 2997ms ago. Triggered the bug!
Offers: 14646726. Takes: 14646788. Size: 18. Sizes: 4015
Last size() operation completed 3997ms ago. Triggered the bug!
Offers: 17611317. Takes: 17610724. Size: 18. Sizes: 4015
Last size() operation completed 4997ms ago. Triggered the bug!
Offers: 21249662. Takes: 21249723. Size: 18. Sizes: 4015
cscotta@ordasity.local:~/Desktop$ scala -cp jsr166y.jar Repro.scala
Offers: 1800. Takes: 1793. Size: 6. Sizes: 30
Offers: 6821272. Takes: 6821381. Size: 6. Sizes: 1644563
Offers: 14019177. Takes: 14019250. Size: 1. Sizes: 3212686
Offers: 21245130. Takes: 21245229. Size: 8. Sizes: 4660727
Offers: 28501419. Takes: 28501525. Size: 2. Sizes: 6175792
Offers: 35828129. Takes: 35828237. Size: 3318. Sizes: 7604674
Offers: 43156621. Takes: 43156711. Size: 2. Sizes: 8952170
Offers: 50380566. Takes: 50380492. Size: 268. Sizes: 10428904
Offers: 57742001. Takes: 57742066. Size: 60. Sizes: 11757772
...
import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.AtomicLong
//import jsr166y.LinkedTransferQueue
import scala.concurrent.forkjoin.LinkedTransferQueue
// Queue and the object we'll repeatedly insert
val obj = new Object
val q = new LinkedTransferQueue[Object]
// Counters
val size = new AtomicLong(0)
val takes = new AtomicLong(0)
val offers = new AtomicLong(0)
val sizeCount = new AtomicLong(0)
val lastSizeComplete = new AtomicLong(0)
// A producer to insert into the queue
val producer = new Runnable() {
def run = { while (true) if (q.offer(obj)) offers.incrementAndGet }
}
// A consumer to retrieve elements from the queue
val consumer = new Runnable() {
def run = { while (true) if (q.poll != null) takes.incrementAndGet }
}
// A Runnable to repeatedly call .size() on the queue
val sizer = new Runnable() {
def run = {
while (true) {
size.set(q.size)
sizeCount.incrementAndGet
lastSizeComplete.set(System.currentTimeMillis)
}
}
}
// Log stats observed when running the above.
val stats = new Runnable() {
def run = {
while (true) {
println("Offers: " + offers.get + ". Takes: " + takes.get +
". Size: " + size.get + ". Sizes: " + sizeCount.get)
val lastSizeAgo = System.currentTimeMillis - lastSizeComplete.get
if (lastSizeAgo > 1000)
println("Last size() operation completed " + lastSizeAgo +
"ms ago. Triggered the bug!")
Thread.sleep(1000)
}
}
}
// Launch the producers and consumers
for (i <- 0 until 5) {
new Thread(producer, "Producer-" + i).start
new Thread(consumer, "Consumer-" + i).start
}
// Launch the thread to call "size()" repeatedly, begin printing stats, and wait.
new Thread(sizer, "Sizer").start()
new Thread(stats, "Stats").start()
new CountDownLatch(1).await()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment