Skip to content

Instantly share code, notes, and snippets.

@hrj
Forked from akihiro4chawon/parasort.scala
Last active December 24, 2015 07:09
Show Gist options
  • Save hrj/6761561 to your computer and use it in GitHub Desktop.
Save hrj/6761561 to your computer and use it in GitHub Desktop.
import java.util.Arrays
object Util {
import concurrent._
import ExecutionContext.Implicits.global
def par(f1 : => Unit, f2 : => Unit) {
val fut1 = future { f1 }
val fut2 = future { f2 }
Await.ready(fut1.zip(fut2), duration.Duration.Inf)
}
}
import Util._
abstract class Sorter {
def sorted(a: Array[Int]): Array[Int]
}
object SimpleSorter extends Sorter {
def sorted(a: Array[Int]) = {
Arrays.sort(a)
a
}
}
object DivideAndMergeParallelSorter extends Sorter {
def sorted(a: Array[Int]) = {
require(a.length >= 2)
import scala.annotation.tailrec
val len = a.length
val half = len / 2
par(Arrays.sort(a, 0, half), Arrays.sort(a, half, len))
val ret = new Array[Int](a.length)
@tailrec
def merge(i: Int, j: Int, k: Int) {
if (a(j) <= a(k)) {
ret(i) = a(j)
if (j < half - 1) merge(i + 1, j + 1, k)
else System.arraycopy(a, k, ret, i + 1, len - k)
} else {
ret(i) = a(k)
if (k < len - 1) merge(i + 1, j, k + 1)
else System.arraycopy(a, j, ret, i + 1, half - j)
}
}
merge(0, 0, half)
ret
}
}
object DivideAndMergeParallelSorter2 extends Sorter {
def sorted(a: Array[Int]) = {
require(a.length >= scala.collection.parallel.availableProcessors)
import scala.annotation.tailrec
val nDiv = collection.parallel.availableProcessors
val len = a.length
val pslices = (0 until nDiv).par map {i => Arrays.copyOfRange(a, i * len / nDiv, (i + 1) * len / nDiv)}
pslices foreach (Arrays.sort _)
def merge(a: Array[Int], b: Array[Int]): Array[Int] = {
val alen = a.length
val blen = b.length
val ret = new Array[Int](alen + blen);
@tailrec def rec(i: Int, j: Int, k: Int) {
if (a(j) <= b(k)) {
ret(i) = a(j)
if (j < alen - 1) rec(i + 1, j + 1, k)
else System.arraycopy(b, k, ret, i + 1, blen - k)
} else {
ret(i) = b(k)
if (k < blen - 1) rec(i + 1, j, k + 1)
else System.arraycopy(a, j, ret, i + 1, alen - j)
}
}
rec(0, 0, 0)
ret
}
pslices reduce merge
}
}
object ShellParallelSorter extends Sorter {
def sorted(a: Array[Int]) = {
val hInit = (Iterator.iterate(1)(_ * 3 + 1) find (a.length <) get) / 3
for (h <- Iterator.iterate(hInit)(_ / 3) takeWhile (1 <=)) {
for (k <- 0 until h par) {
var i = k - h; while ({i += h; i < a.size}) {
// for (i <- k until a.size by h) {
val v = a(i)
var j = i
while (j >= h && (a(j - h) > v)) {
a(j) = a(j - h)
j -= h
}
a(j) = v
}
}
}
a
}
}
object ShellParallelSorterOpt extends Sorter {
def sorted(a: Array[Int]) = {
val len = a.length
var h = 1; while (h < len) {h *= 3; h += 1}
while ({h /= 3; h >= 1}) {
for (k <- 0 until h par) {
var i = k - h; while ({i += h; i < len}) {
val v = a(i)
var j = i
while (j >= h && (a(j - h) > v)) {
a(j) = a(j - h)
j -= h
}
a(j) = v
}
}
}
a
}
}
object ShellParallelSorterOpt2 extends Sorter {
def sorted(a: Array[Int]) = {
val len = a.length
var h = 1; while (h < len) {h *= 3; h += 1}
h/=3;h/=3
while ({h /= 3; h > 1}) {
for (k <- 0 until h par) {
var i = k - h; while ({i += h; i < len}) {
val v = a(i)
var j = i
while (j >= h && (a(j - h) > v)) {
a(j) = a(j - h)
j -= h
}
a(j) = v
}
}
}
var i = 0; while ({i += 1; i < len}) {
val v = a(i)
var j = i; while (j >= h && (a(j - h) > v)) {
a(j) = a(j - 1)
j -= 1
}
a(j) = v
}
a
}
}
object ShellParallelSorterThread extends Sorter with Runnable {
val count = new java.util.concurrent.atomic.AtomicInteger
var h: Int = 1
var a: Array[Int] = null
def run() {
var i: Int = 0
val len = this.a.length
val a = this.a
val h = this.h
while ({i = count.getAndDecrement; i >= 0}) {
i -= h; while ({i += h; i < len}) {
val v = a(i)
var j = i
while (j >= h && (a(j - h) > v)) {
a(j) = a(j - h)
j -= h
}
a(j) = v
}
}
}
def sorted(a: Array[Int]) = {
this.a = a
val len = a.length
while (h < len) {h *= 3; h += 1}
h /= 3;
while ({h /= 3; h > 1}) {
count.set(h - 1)
val nThreads = (Runtime.getRuntime.availableProcessors - 1) min (h - 1)
val pool = Array.fill(nThreads)(new Thread(this))
pool foreach {_.start}
run
pool foreach {_.join}
}
var i = 0; while ({i += 1; i < len}) {
val v = a(i)
var j = i; while (j >= h && (a(j - h) > v)) {
a(j) = a(j - 1)
j -= 1
}
a(j) = v
}
a
}
}
object ShellParallelSorterThreadPool extends Sorter with Runnable {
import java.util.concurrent._
val count = new java.util.concurrent.atomic.AtomicInteger
var latch: CountDownLatch = null
var h: Int = 1
var a: Array[Int] = null
def run() {
var i: Int = 0
val len = this.a.length
val a = this.a
val h = this.h
i = count.getAndDecrement
i -= h;
while ({i += h; i < len}) {
val v = a(i)
var j = i
while (j >= h && (a(j - h) > v)) {
a(j) = a(j - h)
j -= h
}
a(j) = v
}
latch.countDown
}
def sorted(a: Array[Int]) = {
this.a = a
val len = a.length
while (h < len) {h *= 3; h += 1}
h /= 3;
val pool = Executors.newFixedThreadPool(Runtime.getRuntime.availableProcessors)
while ({h /= 3; h > 1}) {
count.set(h - 1)
latch = new CountDownLatch(h)
// 0 until h foreach {i => pool.execute(this)}
var i = 0; while (i < h) {pool.execute(this); i += 1}
latch.await
}
var i = 0; while ({i += 1; i < len}) {
val v = a(i)
var j = i; while (j >= h && (a(j - h) > v)) {
a(j) = a(j - 1)
j -= 1
}
a(j) = v
}
a
}
}
object QuickSeqSorter extends Sorter {
import scala.annotation.tailrec
def sorted(a: Array[Int]) = {
def isort(b: Int, e: Int) = {
var i = b; while ({i += 1; i <= e}) {
var j = i; while (j > b && a(j) < a(j - 1)) {
val t = a(j); a(j) = a(j - 1); a(j - 1) = t
j -= 1
}
}
}
def partition(p: Int, r: Int) = {
{
val i = p + scala.util.Random.nextInt(r - p)
val t = a(p); a(p) = a(i); a(i) = t
}
val x = a(p)
var k = p
var l = r + 1
while ({k += 1; (a(k) <= x && k < r)}) {};
while ({l -= 1; a(l) > x}) {};
while (k < l) {
val t = a(k); a(k) = a(l); a(l) = t
while ({k += 1; a(k) <= x}) {}
while ({l -= 1; a(l) > x}) {}
}
val t = a(p); a(p) = a(l); a(l) = t
l
}
def rec(p: Int, r: Int) {
if (r - p < 7)
isort(p, r)
else {
val q = partition(p, r)
rec(p, q - 1)
rec(q + 1, r)
}
}
rec(0, a.length - 1)
a
}
}
class LightQueue(cap: Int) {
private val q = new Array[Int](cap)
private var beg, end = 0
final def add(a: Int, b: Int) {
q(beg) = a; beg += 1; //beg &= (cap - 1)
q(beg) = b; beg += 1; beg &= (cap - 1)
}
final def pop() = {
val ret = q(end)
end += 1; end &= (cap - 1)
ret
}
final def isEmpty = beg == end
}
class LightStack(cap: Int) {
private val q = new Array[Int](cap)
private var end = 0
final def add(a: Int, b: Int) {
q(end) = b; end += 1
q(end) = a; end += 1
}
final def pop() = {
end -= 1
q(end)
}
final def isEmpty = end == 0
}
object QuickSeqQueueSorter extends Sorter {
def sorted(a: Array[Int]) = {
def isort(b: Int, e: Int) = {
var i = b; while ({i += 1; i <= e}) {
var j = i; while (j > b && a(j) < a(j - 1)) {
val t = a(j); a(j) = a(j - 1); a(j - 1) = t
j -= 1
}
}
}
def partition(p: Int, r: Int) = {
val i = p + scala.util.Random.nextInt(r - p)
val x = a(i); a(i) = a(p); a(p) = x
var k = p
var l = r + 1
while ({k += 1; (a(k) <= x && k < r)}) {};
while ({l -= 1; a(l) > x}) {};
while (k < l) {
val t = a(k); a(k) = a(l); a(l) = t
while ({k += 1; a(k) <= x}) {}
while ({l -= 1; a(l) > x}) {}
}
val t = a(p); a(p) = a(l); a(l) = t
l
}
val queue = new LightStack(a.length)
queue.add(0, a.length - 1)
while (!queue.isEmpty) {
var p, r: Int = 0
synchronized {
p = queue.pop
r = queue.pop
}
if (r - p < 19) {
isort(p, r)
} else {
val q = partition(p, r)
synchronized {
queue.add(p, q - 1)
queue.add(q + 1, r)
}
}
}
a
}
}
object QuickParallelSorter extends Sorter with Runnable {
import java.util.concurrent.Semaphore
import java.util.concurrent.atomic.AtomicInteger
var queue: LightStack = _
var array: Array[Int] = _
var semaphore: Semaphore = _
var isDone: Boolean = _
var nRemains: AtomicInteger = _
var nThreads: Int = _
def run {
val a = array
val queue = this.queue
def isort(b: Int, e: Int) = {
var i = b; while ({i += 1; i <= e}) {
var j = i; while (j > b && a(j) < a(j - 1)) {
val t = a(j); a(j) = a(j - 1); a(j - 1) = t
j -= 1
}
}
}
def qsort(p: Int, r: Int) {
if (r - p < 13)
isort(p, r)
else {
val q = partition(p, r)
qsort(p, q - 1)
qsort(q + 1, r)
}
}
def partition(p: Int, r: Int) = {
val i = p + scala.util.Random.nextInt(r - p)
val x = a(i); a(i) = a(p); a(p) = x
var k = p
var l = r + 1
while ({k += 1; (a(k) <= x && k < r)}) {};
while ({l -= 1; a(l) > x}) {};
while (k < l) {
val t = a(k); a(k) = a(l); a(l) = t
while ({k += 1; a(k) <= x}) {}
while ({l -= 1; a(l) > x}) {}
}
val t = a(p); a(p) = a(l); a(l) = t
l
}
while (true) {
semaphore.acquire
if (isDone) return
var p, r: Int = 0
synchronized {
p = queue.pop
r = queue.pop
}
if (r - p < 19*10*3) {
qsort(p, r)
if (nRemains.addAndGet(p - r - 1) == 0) {
isDone = true
semaphore.release(nThreads)
}
} else {
val q = partition(p, r)
var nsem = 0
synchronized {
if (p != q) {queue.add(p, q - 1); nsem += 1}
if (q != r) {queue.add(q + 1, r); nsem += 1}
}
nRemains.decrementAndGet
semaphore.release(nsem)
}
}
}
def sorted(a: Array[Int]) = {
queue = new LightStack(a.length)
queue.add(0, a.length - 1)
array = a
isDone = false
nThreads = Runtime.getRuntime.availableProcessors
semaphore = new Semaphore(1)
nRemains = new AtomicInteger(a.length)
val threads = Array.fill(nThreads)(new Thread(this))
threads foreach {_.start}
threads foreach {_.join}
a
}
}
class ParallelSorter extends Sorter with Runnable {
import java.util.concurrent.Semaphore
import java.util.concurrent.atomic.AtomicInteger
var queue: LightStack = null
var array: Array[Int] = null
var semaphore: Semaphore = null
var isDone = false
var nRemains: AtomicInteger = null
var nThreads: Int = _
def run {
def partition(p: Int, r: Int) = {
val i = p + scala.util.Random.nextInt(r - p)
val x = array(i); array(i) = array(p); array(p) = x
var k = p
var l = r + 1
while ({k += 1; (array(k) <= x && k < r)}) {};
while ({l -= 1; array(l) > x}) {};
while (k < l) {
val t = array(k); array(k) = array(l); array(l) = t
while ({k += 1; array(k) <= x}) {}
while ({l -= 1; array(l) > x}) {}
}
val t = array(p); array(p) = array(l); array(l) = t
l
}
while (true) {
semaphore.acquire
if (isDone) return
var p, r: Int = 0
synchronized {
p = queue.pop
r = queue.pop
}
if (r - p < 19*10*3) {
java.util.Arrays.sort(array, p, r + 1)
if (nRemains.addAndGet(p - r - 1) == 0) {
isDone = true
semaphore.release(nThreads)
}
} else {
val q = partition(p, r)
var nsem = 0
synchronized {
if (p != q) {queue.add(p, q - 1); nsem += 1}
if (q != r) {queue.add(q + 1, r); nsem += 1}
}
nRemains.decrementAndGet
semaphore.release(nsem)
}
}
}
def sorted(a: Array[Int]) = {
this.queue = new LightStack(a.length)
this.queue.add(0, a.length - 1)
this.array = a
this.isDone = false
this.semaphore = new Semaphore(1)
this.nRemains = new AtomicInteger(a.length)
this.nThreads = Runtime.getRuntime.availableProcessors
val threads = Array.fill(nThreads)(new Thread(this))
threads foreach {_.start}
threads foreach {_.join}
a
}
}
object Main extends App {
import scala.collection.mutable.WrappedArray
import scala.util.Random
object RandomSource {
private val src: WrappedArray[Int] = Array.range(0, 1000000)
def getShuffled = scala.util.Random.shuffle(src).toArray
}
val impls = Seq(
SimpleSorter,
DivideAndMergeParallelSorter,
DivideAndMergeParallelSorter2,
ShellParallelSorter,
ShellParallelSorterOpt,
ShellParallelSorterOpt2,
ShellParallelSorterThread,
ShellParallelSorterThreadPool,
QuickSeqSorter,
QuickSeqQueueSorter,
QuickParallelSorter,
new ParallelSorter
)
check(impls.head, impls.tail :_*)
impls foreach { sorter =>
System.gc; Thread.sleep(300);
val times = for (i <- 1 to 10) yield benchmark(sorter, i)
val noOutliers = times.sorted.drop(2).reverse.drop(2)
println(sorter.getClass.getName+" avg. " + noOutliers.sum / noOutliers.size / 1000000 + "[ms]")
}
def benchmark(sorter: Sorter, i: Int) = {
System.gc; Thread.sleep(300)
val r = RandomSource.getShuffled
val t1 = System.nanoTime
sorter.sorted(r)
val t = System.nanoTime - t1
// println(sorter.getClass.getName+" #"+i+": "+((t)/1000000)+"[ms]")
t
}
def check(refSorter: Sorter, sorters: Sorter*) {
val r = RandomSource.getShuffled
val a = refSorter.sorted(r.clone)
sorters foreach {s => assert(Arrays.equals(s.sorted(r.clone), a))}
}
}
Main.main(args)
# i7-3610QM CPU @ 2.30GHz
Main$$anon$1$SimpleSorter$ avg. 83[ms]
Main$$anon$1$DivideAndMergeParallelSorter$ avg. 58[ms]
Main$$anon$1$DivideAndMergeParallelSorter2$ avg. 31[ms]
Main$$anon$1$ShellParallelSorter$ avg. 92[ms]
Main$$anon$1$ShellParallelSorterOpt$ avg. 82[ms]
Main$$anon$1$ShellParallelSorterOpt2$ avg. 74[ms]
Main$$anon$1$ShellParallelSorterThread$ avg. 114[ms]
Main$$anon$1$ShellParallelSorterThreadPool$ avg. 298[ms]
Main$$anon$1$QuickSeqSorter$ avg. 92[ms]
Main$$anon$1$QuickSeqQueueSorter$ avg. 91[ms]
Main$$anon$1$QuickParallelSorter$ avg. 32[ms]
Main$$anon$1$ParallelSorter avg. 29[ms]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment