Skip to content

Instantly share code, notes, and snippets.

@akihiro4chawon
Created May 19, 2011 16:09
Show Gist options
  • Save akihiro4chawon/981139 to your computer and use it in GitHub Desktop.
Save akihiro4chawon/981139 to your computer and use it in GitHub Desktop.
practical implementation of parallel sorting algorithm in Scala
import java.util.Arrays
abstract class Sorter {
def sorted(a: Array[Int]): Array[Int]
}
object SimpleSorter extends Sorter {
def sorted(a: Array[Int]) = {
Arrays.sort(a)
a
}
}
object DivideAndMergeParallelSorter extends Sorter {
def sorted(a: Array[Int]) = {
require(a.length >= 2)
import scala.annotation.tailrec
import scala.concurrent.ops._
val len = a.length
val half = len / 2
par(Arrays.sort(a, 0, half), Arrays.sort(a, half, len))
val ret = new Array[Int](a.length)
@tailrec
def merge(i: Int, j: Int, k: Int) {
if (a(j) <= a(k)) {
ret(i) = a(j)
if (j < half - 1) merge(i + 1, j + 1, k)
else System.arraycopy(a, k, ret, i + 1, len - k)
} else {
ret(i) = a(k)
if (k < len - 1) merge(i + 1, j, k + 1)
else System.arraycopy(a, j, ret, i + 1, half - j)
}
}
merge(0, 0, half)
ret
}
}
object DivideAndMergeParallelSorter2 extends Sorter {
def sorted(a: Array[Int]) = {
require(a.length >= scala.collection.parallel.availableProcessors)
import scala.annotation.tailrec
import scala.concurrent.ops._
val nDiv = collection.parallel.availableProcessors
val len = a.length
val pslices = (0 until nDiv).par map {i => Arrays.copyOfRange(a, i * len / nDiv, (i + 1) * len / nDiv)}
pslices foreach (Arrays.sort _)
def merge(a: Array[Int], b: Array[Int]): Array[Int] = {
val alen = a.length
val blen = b.length
val ret = new Array[Int](alen + blen);
@tailrec def rec(i: Int, j: Int, k: Int) {
if (a(j) <= b(k)) {
ret(i) = a(j)
if (j < alen - 1) rec(i + 1, j + 1, k)
else System.arraycopy(b, k, ret, i + 1, blen - k)
} else {
ret(i) = b(k)
if (k < blen - 1) rec(i + 1, j, k + 1)
else System.arraycopy(a, j, ret, i + 1, alen - j)
}
}
rec(0, 0, 0)
ret
}
pslices reduce merge
}
}
object ShellParallelSorter extends Sorter {
def sorted(a: Array[Int]) = {
val hInit = (Iterator.iterate(1)(_ * 3 + 1) find (a.length <) get) / 3
for (h <- Iterator.iterate(hInit)(_ / 3) takeWhile (1 <=)) {
for (k <- 0 until h par) {
var i = k - h; while ({i += h; i < a.size}) {
// for (i <- k until a.size by h) {
val v = a(i)
var j = i
while (j >= h && (a(j - h) > v)) {
a(j) = a(j - h)
j -= h
}
a(j) = v
}
}
}
a
}
}
object ShellParallelSorterOpt extends Sorter {
def sorted(a: Array[Int]) = {
val len = a.length
var h = 1; while (h < len) {h *= 3; h += 1}
while ({h /= 3; h >= 1}) {
for (k <- 0 until h par) {
var i = k - h; while ({i += h; i < len}) {
val v = a(i)
var j = i
while (j >= h && (a(j - h) > v)) {
a(j) = a(j - h)
j -= h
}
a(j) = v
}
}
}
a
}
}
object ShellParallelSorterOpt2 extends Sorter {
def sorted(a: Array[Int]) = {
val len = a.length
var h = 1; while (h < len) {h *= 3; h += 1}
h/=3;h/=3
while ({h /= 3; h > 1}) {
for (k <- 0 until h par) {
var i = k - h; while ({i += h; i < len}) {
val v = a(i)
var j = i
while (j >= h && (a(j - h) > v)) {
a(j) = a(j - h)
j -= h
}
a(j) = v
}
}
}
var i = 0; while ({i += 1; i < len}) {
val v = a(i)
var j = i; while (j >= h && (a(j - h) > v)) {
a(j) = a(j - 1)
j -= 1
}
a(j) = v
}
a
}
}
object ShellParallelSorterThread extends Sorter with Runnable {
val count = new java.util.concurrent.atomic.AtomicInteger
var h: Int = 1
var a: Array[Int] = null
def run() {
var i: Int = 0
val len = this.a.length
val a = this.a
val h = this.h
while ({i = count.getAndDecrement; i >= 0}) {
i -= h; while ({i += h; i < len}) {
val v = a(i)
var j = i
while (j >= h && (a(j - h) > v)) {
a(j) = a(j - h)
j -= h
}
a(j) = v
}
}
}
def sorted(a: Array[Int]) = {
this.a = a
val len = a.length
while (h < len) {h *= 3; h += 1}
h /= 3;
while ({h /= 3; h > 1}) {
count.set(h - 1)
val nThreads = (Runtime.getRuntime.availableProcessors - 1) min (h - 1)
val pool = Array.fill(nThreads)(new Thread(this))
pool foreach {_.start}
run
pool foreach {_.join}
}
var i = 0; while ({i += 1; i < len}) {
val v = a(i)
var j = i; while (j >= h && (a(j - h) > v)) {
a(j) = a(j - 1)
j -= 1
}
a(j) = v
}
a
}
}
object ShellParallelSorterThreadPool extends Sorter with Runnable {
import java.util.concurrent._
val count = new java.util.concurrent.atomic.AtomicInteger
var latch: CountDownLatch = null
var h: Int = 1
var a: Array[Int] = null
def run() {
var i: Int = 0
val len = this.a.length
val a = this.a
val h = this.h
i = count.getAndDecrement
i -= h;
while ({i += h; i < len}) {
val v = a(i)
var j = i
while (j >= h && (a(j - h) > v)) {
a(j) = a(j - h)
j -= h
}
a(j) = v
}
latch.countDown
}
def sorted(a: Array[Int]) = {
this.a = a
val len = a.length
while (h < len) {h *= 3; h += 1}
h /= 3;
val pool = Executors.newFixedThreadPool(Runtime.getRuntime.availableProcessors)
while ({h /= 3; h > 1}) {
count.set(h - 1)
latch = new CountDownLatch(h)
// 0 until h foreach {i => pool.execute(this)}
var i = 0; while (i < h) {pool.execute(this); i += 1}
latch.await
}
var i = 0; while ({i += 1; i < len}) {
val v = a(i)
var j = i; while (j >= h && (a(j - h) > v)) {
a(j) = a(j - 1)
j -= 1
}
a(j) = v
}
a
}
}
object QuickSeqSorter extends Sorter {
import scala.annotation.tailrec
def sorted(a: Array[Int]) = {
def isort(b: Int, e: Int) = {
var i = b; while ({i += 1; i <= e}) {
var j = i; while (j > b && a(j) < a(j - 1)) {
val t = a(j); a(j) = a(j - 1); a(j - 1) = t
j -= 1
}
}
}
def partition(p: Int, r: Int) = {
{
val i = p + scala.util.Random.nextInt(r - p)
val t = a(p); a(p) = a(i); a(i) = t
}
val x = a(p)
var k = p
var l = r + 1
while ({k += 1; (a(k) <= x && k < r)}) {};
while ({l -= 1; a(l) > x}) {};
while (k < l) {
val t = a(k); a(k) = a(l); a(l) = t
while ({k += 1; a(k) <= x}) {}
while ({l -= 1; a(l) > x}) {}
}
val t = a(p); a(p) = a(l); a(l) = t
l
}
def rec(p: Int, r: Int) {
if (r - p < 7)
isort(p, r)
else {
val q = partition(p, r)
rec(p, q - 1)
rec(q + 1, r)
}
}
rec(0, a.length - 1)
a
}
}
class LightQueue(cap: Int) {
private val q = new Array[Int](cap)
private var beg, end = 0
final def add(a: Int, b: Int) {
q(beg) = a; beg += 1; //beg &= (cap - 1)
q(beg) = b; beg += 1; beg &= (cap - 1)
}
final def pop() = {
val ret = q(end)
end += 1; end &= (cap - 1)
ret
}
final def isEmpty = beg == end
}
class LightStack(cap: Int) {
private val q = new Array[Int](cap)
private var end = 0
final def add(a: Int, b: Int) {
q(end) = b; end += 1
q(end) = a; end += 1
}
final def pop() = {
end -= 1
q(end)
}
final def isEmpty = end == 0
}
object QuickSeqQueueSorter extends Sorter {
def sorted(a: Array[Int]) = {
def isort(b: Int, e: Int) = {
var i = b; while ({i += 1; i <= e}) {
var j = i; while (j > b && a(j) < a(j - 1)) {
val t = a(j); a(j) = a(j - 1); a(j - 1) = t
j -= 1
}
}
}
def partition(p: Int, r: Int) = {
val i = p + scala.util.Random.nextInt(r - p)
val x = a(i); a(i) = a(p); a(p) = x
var k = p
var l = r + 1
while ({k += 1; (a(k) <= x && k < r)}) {};
while ({l -= 1; a(l) > x}) {};
while (k < l) {
val t = a(k); a(k) = a(l); a(l) = t
while ({k += 1; a(k) <= x}) {}
while ({l -= 1; a(l) > x}) {}
}
val t = a(p); a(p) = a(l); a(l) = t
l
}
val queue = new LightStack(a.length)
queue.add(0, a.length - 1)
while (!queue.isEmpty) {
var p, r: Int = 0
synchronized {
p = queue.pop
r = queue.pop
}
if (r - p < 19) {
isort(p, r)
} else {
val q = partition(p, r)
synchronized {
queue.add(p, q - 1)
queue.add(q + 1, r)
}
}
}
a
}
}
object QuickParallelSorter extends Sorter with Runnable {
import java.util.concurrent.Semaphore
import java.util.concurrent.atomic.AtomicInteger
var queue: LightStack = _
var array: Array[Int] = _
var semaphore: Semaphore = _
var isDone: Boolean = _
var nRemains: AtomicInteger = _
var nThreads: Int = _
def run {
val a = array
val queue = this.queue
def isort(b: Int, e: Int) = {
var i = b; while ({i += 1; i <= e}) {
var j = i; while (j > b && a(j) < a(j - 1)) {
val t = a(j); a(j) = a(j - 1); a(j - 1) = t
j -= 1
}
}
}
def qsort(p: Int, r: Int) {
if (r - p < 13)
isort(p, r)
else {
val q = partition(p, r)
qsort(p, q - 1)
qsort(q + 1, r)
}
}
def partition(p: Int, r: Int) = {
val i = p + scala.util.Random.nextInt(r - p)
val x = a(i); a(i) = a(p); a(p) = x
var k = p
var l = r + 1
while ({k += 1; (a(k) <= x && k < r)}) {};
while ({l -= 1; a(l) > x}) {};
while (k < l) {
val t = a(k); a(k) = a(l); a(l) = t
while ({k += 1; a(k) <= x}) {}
while ({l -= 1; a(l) > x}) {}
}
val t = a(p); a(p) = a(l); a(l) = t
l
}
while (true) {
semaphore.acquire
if (isDone) return
var p, r: Int = 0
synchronized {
p = queue.pop
r = queue.pop
}
if (r - p < 19*10*3) {
qsort(p, r)
if (nRemains.addAndGet(p - r - 1) == 0) {
isDone = true
semaphore.release(nThreads)
}
} else {
val q = partition(p, r)
var nsem = 0
synchronized {
if (p != q) {queue.add(p, q - 1); nsem += 1}
if (q != r) {queue.add(q + 1, r); nsem += 1}
}
nRemains.decrementAndGet
semaphore.release(nsem)
}
}
}
def sorted(a: Array[Int]) = {
queue = new LightStack(a.length)
queue.add(0, a.length - 1)
array = a
isDone = false
nThreads = Runtime.getRuntime.availableProcessors
semaphore = new Semaphore(1)
nRemains = new AtomicInteger(a.length)
val threads = Array.fill(nThreads)(new Thread(this))
threads foreach {_.start}
threads foreach {_.join}
a
}
}
class ParallelSorter extends Sorter with Runnable {
import java.util.concurrent.Semaphore
import java.util.concurrent.atomic.AtomicInteger
var queue: LightStack = null
var array: Array[Int] = null
var semaphore: Semaphore = null
var isDone = false
var nRemains: AtomicInteger = null
var nThreads: Int = _
def run {
def partition(p: Int, r: Int) = {
val i = p + scala.util.Random.nextInt(r - p)
val x = array(i); array(i) = array(p); array(p) = x
var k = p
var l = r + 1
while ({k += 1; (array(k) <= x && k < r)}) {};
while ({l -= 1; array(l) > x}) {};
while (k < l) {
val t = array(k); array(k) = array(l); array(l) = t
while ({k += 1; array(k) <= x}) {}
while ({l -= 1; array(l) > x}) {}
}
val t = array(p); array(p) = array(l); array(l) = t
l
}
while (true) {
semaphore.acquire
if (isDone) return
var p, r: Int = 0
synchronized {
p = queue.pop
r = queue.pop
}
if (r - p < 19*10*3) {
java.util.Arrays.sort(array, p, r + 1)
if (nRemains.addAndGet(p - r - 1) == 0) {
isDone = true
semaphore.release(nThreads)
}
} else {
val q = partition(p, r)
var nsem = 0
synchronized {
if (p != q) {queue.add(p, q - 1); nsem += 1}
if (q != r) {queue.add(q + 1, r); nsem += 1}
}
nRemains.decrementAndGet
semaphore.release(nsem)
}
}
}
def sorted(a: Array[Int]) = {
this.queue = new LightStack(a.length)
this.queue.add(0, a.length - 1)
this.array = a
this.isDone = false
this.semaphore = new Semaphore(1)
this.nRemains = new AtomicInteger(a.length)
this.nThreads = Runtime.getRuntime.availableProcessors
val threads = Array.fill(nThreads)(new Thread(this))
threads foreach {_.start}
threads foreach {_.join}
a
}
}
object Main extends App {
import scala.collection.mutable.WrappedArray
import scala.util.Random
object RandomSource {
private val src: WrappedArray[Int] = Array.range(0, 1000000)
def getShuffled = scala.util.Random.shuffle(src).toArray
}
println(QuickSeqSorter.sorted(Array(3, 5, 1, 2, 4)) toList)
val impls = Seq(
SimpleSorter,
DivideAndMergeParallelSorter,
DivideAndMergeParallelSorter2,
// ShellParallelSorter,
// ShellParallelSorterOpt,
// ShellParallelSorterOpt2,
// ShellParallelSorterThread,
// ShellParallelSorterThreadPool,
QuickSeqSorter,
QuickSeqQueueSorter,
QuickParallelSorter,
new ParallelSorter
)
check(impls.head, impls.tail :_*)
impls foreach { sorter =>
System.gc; Thread.sleep(300);
val times = for (i <- 1 to 10) yield benchmark(sorter, i)
val noOutliers = times.sorted.drop(2).reverse.drop(2)
println("avg. " + noOutliers.sum / noOutliers.size / 1000000 + "[ms]")
}
def benchmark(sorter: Sorter, i: Int) = {
System.gc; Thread.sleep(300)
val r = RandomSource.getShuffled
val t1 = System.nanoTime
sorter.sorted(r)
val t = System.nanoTime - t1
println(sorter.getClass.getName+" #"+i+": "+((t)/1000000)+"[ms]")
t
}
def check(refSorter: Sorter, sorters: Sorter*) {
val r = RandomSource.getShuffled
val a = refSorter.sorted(r.clone)
sorters foreach {s => assert(Arrays.equals(s.sorted(r.clone), a))}
}
}
// @ Core i7 920
SimpleSorter$ #1: 138[ms]
SimpleSorter$ #2: 135[ms]
SimpleSorter$ #3: 134[ms]
SimpleSorter$ #4: 135[ms]
SimpleSorter$ #5: 134[ms]
SimpleSorter$ #6: 135[ms]
SimpleSorter$ #7: 135[ms]
SimpleSorter$ #8: 138[ms]
SimpleSorter$ #9: 137[ms]
SimpleSorter$ #10: 135[ms]
avg. 135[ms]
DivideAndMergeParallelSorter$ #1: 112[ms]
DivideAndMergeParallelSorter$ #2: 75[ms]
DivideAndMergeParallelSorter$ #3: 76[ms]
DivideAndMergeParallelSorter$ #4: 100[ms]
DivideAndMergeParallelSorter$ #5: 88[ms]
DivideAndMergeParallelSorter$ #6: 74[ms]
DivideAndMergeParallelSorter$ #7: 75[ms]
DivideAndMergeParallelSorter$ #8: 76[ms]
DivideAndMergeParallelSorter$ #9: 88[ms]
DivideAndMergeParallelSorter$ #10: 74[ms]
avg. 80[ms]
DivideAndMergeParallelSorter2$ #1: 113[ms]
DivideAndMergeParallelSorter2$ #2: 51[ms]
DivideAndMergeParallelSorter2$ #3: 51[ms]
DivideAndMergeParallelSorter2$ #4: 46[ms]
DivideAndMergeParallelSorter2$ #5: 42[ms]
DivideAndMergeParallelSorter2$ #6: 42[ms]
DivideAndMergeParallelSorter2$ #7: 48[ms]
DivideAndMergeParallelSorter2$ #8: 50[ms]
DivideAndMergeParallelSorter2$ #9: 49[ms]
DivideAndMergeParallelSorter2$ #10: 51[ms]
avg. 49[ms]
ShellParallelSorter$ #1: 212[ms]
ShellParallelSorter$ #2: 160[ms]
ShellParallelSorter$ #3: 166[ms]
ShellParallelSorter$ #4: 188[ms]
ShellParallelSorter$ #5: 157[ms]
ShellParallelSorter$ #6: 165[ms]
ShellParallelSorter$ #7: 186[ms]
ShellParallelSorter$ #8: 158[ms]
ShellParallelSorter$ #9: 162[ms]
ShellParallelSorter$ #10: 183[ms]
avg. 170[ms]
ShellParallelSorterOpt$ #1: 124[ms]
ShellParallelSorterOpt$ #2: 110[ms]
ShellParallelSorterOpt$ #3: 145[ms]
ShellParallelSorterOpt$ #4: 90[ms]
ShellParallelSorterOpt$ #5: 98[ms]
ShellParallelSorterOpt$ #6: 85[ms]
ShellParallelSorterOpt$ #7: 84[ms]
ShellParallelSorterOpt$ #8: 107[ms]
ShellParallelSorterOpt$ #9: 114[ms]
ShellParallelSorterOpt$ #10: 144[ms]
avg. 107[ms]
ShellParallelSorterOpt2$ #1: 117[ms]
ShellParallelSorterOpt2$ #2: 80[ms]
ShellParallelSorterOpt2$ #3: 79[ms]
ShellParallelSorterOpt2$ #4: 79[ms]
ShellParallelSorterOpt2$ #5: 79[ms]
ShellParallelSorterOpt2$ #6: 80[ms]
ShellParallelSorterOpt2$ #7: 78[ms]
ShellParallelSorterOpt2$ #8: 81[ms]
ShellParallelSorterOpt2$ #9: 77[ms]
ShellParallelSorterOpt2$ #10: 78[ms]
avg. 79[ms]
ShellParallelSorterThread$ #1: 129[ms]
ShellParallelSorterThread$ #2: 125[ms]
ShellParallelSorterThread$ #3: 119[ms]
ShellParallelSorterThread$ #4: 116[ms]
ShellParallelSorterThread$ #5: 120[ms]
ShellParallelSorterThread$ #6: 123[ms]
ShellParallelSorterThread$ #7: 121[ms]
ShellParallelSorterThread$ #8: 122[ms]
ShellParallelSorterThread$ #9: 119[ms]
ShellParallelSorterThread$ #10: 122[ms]
avg. 121[ms]
ShellParallelSorterThreadPool$ #1: 448[ms]
ShellParallelSorterThreadPool$ #2: 397[ms]
ShellParallelSorterThreadPool$ #3: 389[ms]
ShellParallelSorterThreadPool$ #4: 487[ms]
ShellParallelSorterThreadPool$ #5: 473[ms]
ShellParallelSorterThreadPool$ #6: 389[ms]
ShellParallelSorterThreadPool$ #7: 396[ms]
ShellParallelSorterThreadPool$ #8: 398[ms]
ShellParallelSorterThreadPool$ #9: 404[ms]
ShellParallelSorterThreadPool$ #10: 397[ms]
avg. 407[ms]
QuickSeqSorter$ #1: 124[ms]
QuickSeqSorter$ #2: 122[ms]
QuickSeqSorter$ #3: 121[ms]
QuickSeqSorter$ #4: 121[ms]
QuickSeqSorter$ #5: 122[ms]
QuickSeqSorter$ #6: 121[ms]
QuickSeqSorter$ #7: 121[ms]
QuickSeqSorter$ #8: 122[ms]
QuickSeqSorter$ #9: 121[ms]
QuickSeqSorter$ #10: 124[ms]
avg. 122[ms]
QuickSeqQueueSorter$ #1: 131[ms]
QuickSeqQueueSorter$ #2: 126[ms]
QuickSeqQueueSorter$ #3: 125[ms]
QuickSeqQueueSorter$ #4: 124[ms]
QuickSeqQueueSorter$ #5: 124[ms]
QuickSeqQueueSorter$ #6: 123[ms]
QuickSeqQueueSorter$ #7: 126[ms]
QuickSeqQueueSorter$ #8: 123[ms]
QuickSeqQueueSorter$ #9: 124[ms]
QuickSeqQueueSorter$ #10: 124[ms]
avg. 124[ms]
QuickParallelSorter$ #1: 34[ms]
QuickParallelSorter$ #2: 32[ms]
QuickParallelSorter$ #3: 33[ms]
QuickParallelSorter$ #4: 31[ms]
QuickParallelSorter$ #5: 28[ms]
QuickParallelSorter$ #6: 31[ms]
QuickParallelSorter$ #7: 29[ms]
QuickParallelSorter$ #8: 29[ms]
QuickParallelSorter$ #9: 31[ms]
QuickParallelSorter$ #10: 29[ms]
avg. 31[ms]
ParallelSorter #1: 36[ms]
ParallelSorter #2: 140[ms]
ParallelSorter #3: 29[ms]
ParallelSorter #4: 32[ms]
ParallelSorter #5: 30[ms]
ParallelSorter #6: 142[ms]
ParallelSorter #7: 29[ms]
ParallelSorter #8: 28[ms]
ParallelSorter #9: 29[ms]
ParallelSorter #10: 141[ms]
avg. 50[ms]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment