Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
parallel radix sort
import std.range;
import std.parallelism;
import core.atomic;
import gcc.builtins;
T[] parallelRadixsort(T)(T[] arr, T[] tmparr) {
assert(arr.length == tmparr.length, "lengths");
alias Histo = int[256];
alias Histo4 = int[256][T.sizeof];
auto n = arr.length / totalCPUs;
Histo4[] localCounts;
localCounts.length = totalCPUs;
foreach(t; parallel(iota(0, totalCPUs), 1)) {
foreach (v; arr[(t*n) .. (t*n+n)]) {
foreach (b; 0 .. T.sizeof) {
uint c = (v >> (b * 8)) & 0xff;
localCounts[t][b][c] += 1;
}
}
}
Histo4 counts;
foreach (lc; localCounts) {
foreach (b; 0 .. T.sizeof) {
counts[b][] += lc[b][];
}
}
enum blockSize = 64*1024;
loop: foreach (b; 0 .. T.sizeof) {
int[256] offsets;
foreach (i; 1..256) {
offsets[i] = counts[b][i-1] + offsets[i-1];
}
shared(ulong)* ticket = new ulong(0);
foreach(bl; parallel(iota(0, arr.length / blockSize), 1)) {
auto slice = arr[bl*blockSize .. bl*blockSize+blockSize];
Histo localCount;
foreach (i, v; slice) {
uint c = (v >> (b * 8)) & 0xff;
localCount[c] += 1;
}
// hardly ideal for more CPUs or preemption
while (!cas(ticket, bl, ulong.max)) { __builtin_ia32_pause(); }
foreach (i, c; localCount) {
localCount[i] = offsets[i];
offsets[i] += c;
}
*ticket = bl+1; // could be error?
foreach(i, v; slice) {
uint c = (v >> (b * 8)) & 0xff;
tmparr.ptr[localCount[c]] = v;
localCount[c] += 1;
}
}
auto tmp = arr;
arr = tmparr;
tmparr = tmp;
}
return arr;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.