Created
February 1, 2016 15:18
-
-
Save kumagi/19ea96a6ea25c7532f47 to your computer and use it in GitHub Desktop.
parallel
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
void msd_first_parallel_radix_sort(const std::vector<uint64_t>::iterator& begin, | |
const std::vector<uint64_t>::iterator& end) { | |
const auto size = std::distance(begin, end); | |
const int nthreads = cpu_num(); // number of threads | |
constexpr size_t bytes = sizeof(uint64_t); | |
constexpr size_t nbuckets = 256; | |
constexpr int kOutBuff = 8; | |
std::vector<std::array<uint64_t, nbuckets> > offsets(nthreads); | |
for (int i = 0; i < nthreads; ++i) { | |
for (int j = 0; j < nbuckets; ++j) { | |
offsets[i][j] = 0; | |
} | |
} | |
// barriers | |
spin_barrier barrier(nthreads); | |
std::unique_ptr<uint64_t[]> buff(new uint64_t[size]); | |
std::vector<local_sum> pref_sums(nthreads + 1); | |
std::array<uint64_t, nbuckets> msd_begin; | |
std::array<uint64_t, nbuckets> msd_end; | |
std::vector<std::thread> workers; | |
workers.reserve(nthreads); | |
std::atomic<int> target_bucket = {0}; | |
for (int tid = nthreads - 1; 0 <= tid; --tid) { | |
auto work = [&, tid] { | |
set_affinity(tid); | |
uint64_t *src = &*begin; | |
uint64_t *dst = buff.get(); | |
uint64_t * const from = &src[size * tid / nthreads]; | |
uint64_t * const to = &src[size * (tid + 1) / nthreads]; | |
char working_space[nbuckets * 8 * kOutBuff + 63]; | |
auto& wbuffer | |
= *reinterpret_cast<std::array<std::array<uint64_t, kOutBuff>, nbuckets>*> | |
((reinterpret_cast<intptr_t>(working_space) + 63) & ~63llu); | |
std::array<uint8_t, nbuckets> wbuff_n = {0}; | |
std::array<uint64_t, nbuckets>& my_offset = offsets[tid]; | |
{ | |
for (auto* ptr = from; ptr != to; ++ptr) { | |
// count by msd | |
++my_offset[(*ptr >> ((bytes - 1) * 8)) & (nbuckets - 1)]; | |
} | |
} | |
barrier.wait(tid); | |
{ // prefix sum | |
uint64_t base_sum = 0; | |
const size_t begin_bucket = nbuckets * tid / nthreads; | |
const size_t end_bucket = nbuckets * (tid + 1) / nthreads; | |
for (size_t k = 0; k < nthreads; ++k) { | |
for (size_t j = begin_bucket; j < end_bucket; ++j) { | |
base_sum += offsets[k][j]; | |
} | |
} | |
while (!pref_sums[tid].finished.load(std::memory_order_acquire) && | |
tid != 0) { } // spin | |
base_sum += pref_sums[tid].sum; | |
pref_sums[tid + 1].sum = base_sum; | |
pref_sums[tid + 1].finished.store(true, std::memory_order_release); | |
uint64_t prev_sum = pref_sums[tid].sum; | |
for (size_t j = begin_bucket; j < end_bucket; ++j) { | |
msd_begin[j] = prev_sum; | |
for (size_t k = 0; k < nthreads && prev_sum < size; ++k) { | |
const uint64_t tmp = offsets[k][j] + prev_sum; | |
offsets[k][j] = prev_sum; | |
prev_sum = tmp; | |
assert(offsets[k][j] < size); | |
} | |
msd_end[j] = prev_sum; | |
} | |
barrier.wait(tid); | |
} | |
{ // move by msd | |
for (auto* ptr = from; ptr != to; ++ptr) { | |
const uint8_t idx = static_cast<uint8_t>( | |
(*ptr >> ((bytes - 1) * 8)) & (nbuckets - 1)); | |
wbuffer[idx][wbuff_n[idx]++] = *ptr; | |
if (wbuff_n[idx] == wbuffer[idx].size()) { | |
// reached cache line size | |
::memcpy(&dst[my_offset[idx]], | |
wbuffer[idx].data(), | |
kOutBuff * sizeof(uint64_t)); | |
my_offset[idx] += kOutBuff; | |
wbuff_n[idx] = 0; | |
} | |
} | |
for (size_t i = 0; i < nbuckets; ++i) { | |
for (size_t j = 0; j < wbuff_n[i]; ++j) { | |
dst[my_offset[i] + j] = std::move(wbuffer[i][j]); | |
} | |
wbuff_n[i] = 0; | |
my_offset[i] = 0; | |
} | |
std::swap(src, dst); | |
barrier.wait(tid); | |
} | |
{ // sort each bucket from lsd | |
std::array<std::array<uint64_t, nbuckets>, bytes - 1> l_offsets; | |
while (target_bucket.load() < nbuckets) { | |
const int target = target_bucket.fetch_add(1); | |
auto* my_begin = &src[msd_begin[target]]; | |
auto* my_end = &src[msd_end[target]]; | |
for (size_t i = 0; i < bytes - 1; ++i) { | |
for (size_t j = 0; j < nbuckets; ++j) { | |
l_offsets[i][j] = 0; | |
} | |
} | |
for (auto ptr = my_begin; ptr != my_end; ++ptr) { | |
for (size_t i = 0; i < bytes - 1; ++i) { | |
const auto base = i * 8; | |
++l_offsets[i][*ptr >> base & (nbuckets - 1)]; | |
} | |
} | |
for (size_t i = 0; i < bytes - 1; ++i) { | |
uint64_t prev_sum = 0; | |
auto& offset = l_offsets[i]; | |
for (size_t j = 0; j < nbuckets; ++j) { | |
const uint64_t tmp = offset[j] + prev_sum; | |
offset[j] = prev_sum; | |
prev_sum = tmp; | |
assert(l_offset[j] <= size); | |
} | |
} | |
for (size_t i = 0; i < bytes - 1; ++i) { | |
auto& offset = l_offsets[i]; | |
const auto base = i * 8; | |
uint64_t* const start = &src[msd_begin[target]]; | |
uint64_t* const finish = &src[msd_end[target]]; | |
uint64_t* const out = &dst[msd_begin[target]]; | |
for (uint64_t* pos = start; pos != finish; ++pos) { | |
const uint8_t bucket = static_cast<uint8_t>((*pos >> base) & 255); | |
wbuffer[bucket][wbuff_n[bucket]++] = std::move(*pos); | |
if (wbuff_n[bucket] == wbuffer[bucket].size()) { | |
// reached cache line size | |
::memcpy(&out[offset[bucket]], | |
&wbuffer[bucket][0], | |
sizeof(uint64_t) * kOutBuff); | |
offset[bucket] += kOutBuff; | |
wbuff_n[bucket] = 0; | |
} | |
} | |
for (size_t j = 0; j < nbuckets; ++j) { | |
for (size_t k = 0; k < wbuff_n[j]; ++k) { | |
out[offset[j]++] = std::move(wbuffer[j][k]); | |
} | |
wbuff_n[j] = 0; | |
} | |
std::swap(src, dst); | |
} | |
std::swap(src, dst); | |
} | |
} | |
}; | |
if (tid != 0) { | |
workers.emplace_back(work); | |
} else { | |
work(); | |
} | |
} | |
for (auto &worker : workers) { | |
worker.join(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment