Skip to content

Instantly share code, notes, and snippets.

@kumagi
Created February 1, 2016 15:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kumagi/19ea96a6ea25c7532f47 to your computer and use it in GitHub Desktop.
Save kumagi/19ea96a6ea25c7532f47 to your computer and use it in GitHub Desktop.
parallel
void msd_first_parallel_radix_sort(const std::vector<uint64_t>::iterator& begin,
const std::vector<uint64_t>::iterator& end) {
const auto size = std::distance(begin, end);
const int nthreads = cpu_num(); // number of threads
constexpr size_t bytes = sizeof(uint64_t);
constexpr size_t nbuckets = 256;
constexpr int kOutBuff = 8;
std::vector<std::array<uint64_t, nbuckets> > offsets(nthreads);
for (int i = 0; i < nthreads; ++i) {
for (int j = 0; j < nbuckets; ++j) {
offsets[i][j] = 0;
}
}
// barriers
spin_barrier barrier(nthreads);
std::unique_ptr<uint64_t[]> buff(new uint64_t[size]);
std::vector<local_sum> pref_sums(nthreads + 1);
std::array<uint64_t, nbuckets> msd_begin;
std::array<uint64_t, nbuckets> msd_end;
std::vector<std::thread> workers;
workers.reserve(nthreads);
std::atomic<int> target_bucket = {0};
for (int tid = nthreads - 1; 0 <= tid; --tid) {
auto work = [&, tid] {
set_affinity(tid);
uint64_t *src = &*begin;
uint64_t *dst = buff.get();
uint64_t * const from = &src[size * tid / nthreads];
uint64_t * const to = &src[size * (tid + 1) / nthreads];
char working_space[nbuckets * 8 * kOutBuff + 63];
auto& wbuffer
= *reinterpret_cast<std::array<std::array<uint64_t, kOutBuff>, nbuckets>*>
((reinterpret_cast<intptr_t>(working_space) + 63) & ~63llu);
std::array<uint8_t, nbuckets> wbuff_n = {0};
std::array<uint64_t, nbuckets>& my_offset = offsets[tid];
{
for (auto* ptr = from; ptr != to; ++ptr) {
// count by msd
++my_offset[(*ptr >> ((bytes - 1) * 8)) & (nbuckets - 1)];
}
}
barrier.wait(tid);
{ // prefix sum
uint64_t base_sum = 0;
const size_t begin_bucket = nbuckets * tid / nthreads;
const size_t end_bucket = nbuckets * (tid + 1) / nthreads;
for (size_t k = 0; k < nthreads; ++k) {
for (size_t j = begin_bucket; j < end_bucket; ++j) {
base_sum += offsets[k][j];
}
}
while (!pref_sums[tid].finished.load(std::memory_order_acquire) &&
tid != 0) { } // spin
base_sum += pref_sums[tid].sum;
pref_sums[tid + 1].sum = base_sum;
pref_sums[tid + 1].finished.store(true, std::memory_order_release);
uint64_t prev_sum = pref_sums[tid].sum;
for (size_t j = begin_bucket; j < end_bucket; ++j) {
msd_begin[j] = prev_sum;
for (size_t k = 0; k < nthreads && prev_sum < size; ++k) {
const uint64_t tmp = offsets[k][j] + prev_sum;
offsets[k][j] = prev_sum;
prev_sum = tmp;
assert(offsets[k][j] < size);
}
msd_end[j] = prev_sum;
}
barrier.wait(tid);
}
{ // move by msd
for (auto* ptr = from; ptr != to; ++ptr) {
const uint8_t idx = static_cast<uint8_t>(
(*ptr >> ((bytes - 1) * 8)) & (nbuckets - 1));
wbuffer[idx][wbuff_n[idx]++] = *ptr;
if (wbuff_n[idx] == wbuffer[idx].size()) {
// reached cache line size
::memcpy(&dst[my_offset[idx]],
wbuffer[idx].data(),
kOutBuff * sizeof(uint64_t));
my_offset[idx] += kOutBuff;
wbuff_n[idx] = 0;
}
}
for (size_t i = 0; i < nbuckets; ++i) {
for (size_t j = 0; j < wbuff_n[i]; ++j) {
dst[my_offset[i] + j] = std::move(wbuffer[i][j]);
}
wbuff_n[i] = 0;
my_offset[i] = 0;
}
std::swap(src, dst);
barrier.wait(tid);
}
{ // sort each bucket from lsd
std::array<std::array<uint64_t, nbuckets>, bytes - 1> l_offsets;
while (target_bucket.load() < nbuckets) {
const int target = target_bucket.fetch_add(1);
auto* my_begin = &src[msd_begin[target]];
auto* my_end = &src[msd_end[target]];
for (size_t i = 0; i < bytes - 1; ++i) {
for (size_t j = 0; j < nbuckets; ++j) {
l_offsets[i][j] = 0;
}
}
for (auto ptr = my_begin; ptr != my_end; ++ptr) {
for (size_t i = 0; i < bytes - 1; ++i) {
const auto base = i * 8;
++l_offsets[i][*ptr >> base & (nbuckets - 1)];
}
}
for (size_t i = 0; i < bytes - 1; ++i) {
uint64_t prev_sum = 0;
auto& offset = l_offsets[i];
for (size_t j = 0; j < nbuckets; ++j) {
const uint64_t tmp = offset[j] + prev_sum;
offset[j] = prev_sum;
prev_sum = tmp;
assert(l_offset[j] <= size);
}
}
for (size_t i = 0; i < bytes - 1; ++i) {
auto& offset = l_offsets[i];
const auto base = i * 8;
uint64_t* const start = &src[msd_begin[target]];
uint64_t* const finish = &src[msd_end[target]];
uint64_t* const out = &dst[msd_begin[target]];
for (uint64_t* pos = start; pos != finish; ++pos) {
const uint8_t bucket = static_cast<uint8_t>((*pos >> base) & 255);
wbuffer[bucket][wbuff_n[bucket]++] = std::move(*pos);
if (wbuff_n[bucket] == wbuffer[bucket].size()) {
// reached cache line size
::memcpy(&out[offset[bucket]],
&wbuffer[bucket][0],
sizeof(uint64_t) * kOutBuff);
offset[bucket] += kOutBuff;
wbuff_n[bucket] = 0;
}
}
for (size_t j = 0; j < nbuckets; ++j) {
for (size_t k = 0; k < wbuff_n[j]; ++k) {
out[offset[j]++] = std::move(wbuffer[j][k]);
}
wbuff_n[j] = 0;
}
std::swap(src, dst);
}
std::swap(src, dst);
}
}
};
if (tid != 0) {
workers.emplace_back(work);
} else {
work();
}
}
for (auto &worker : workers) {
worker.join();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment