Skip to content

Instantly share code, notes, and snippets.

@dhollman
Last active July 20, 2018 13:59
Show Gist options
  • Save dhollman/0cb3cedc55b198aa66443eda419331b3 to your computer and use it in GitHub Desktop.
Save dhollman/0cb3cedc55b198aa66443eda419331b3 to your computer and use it in GitHub Desktop.
Standard Algorithms with Executors

Standard Algorithms Interaction with Different Executors Proposals

Disclaimers

Herein I'll use some hand-wavy implementation of the widely-suggested policy.on(executor) where, for instance, par.on(ex) returns something that matches a concept called ParallelPolicyOn<Executor>. I'll also be hand-wavy about a bunch of other things (e.g., ignoring the existence of ranges), but if they become a distraction (or if someone thinks something I've ignored is relevant) I'll be happy to change them. I'm also using compact concept notation for brevity.

An exploration of std::sort

P0443 + P1054

Oversimplified, strawman implementation of std::sort using P0443+P1054. The second version, below this, is more analogous to what I've tried (but failed) to write using P1055-like semantics.

P0443+P1054 Authors: Is there another way to express some of the hand-wavy "missing" stuff here using what we have?

// assumptions: swap and/or comp may be expensive compared to task creation,
// in which case we do not want to be lazy (or don't want to be lazy until
// a certain depth is reached) in order to take advantage of available parallelism
auto _par_qs_impl(Executor ex, RandomIt data, size_t lo, size_t hi, Compare comp) {
  if(lo - hi <= 1) {
    // P0443+P1054 needs something like this:
    return execution::query(ex, ready_future_factory<void>).make_ready_future(); 
  }
  auto p = ex.twoway_execute([data,lo,hi,comp]{
    // copied from Wikipedia, because I didn't feel like thinking about it
    auto& pivot = data[hi];
    auto i = lo - 1; 
    for(size_t j = lo; j < hi; ++j) {
      if(comp(data[j], pivot)) std::swap(data[++i], data[j]);
    }
    std::swap(data[i+1], data[hi]);
    return i + 1;
  }).share(); // share is hand-wavy; not all futures will necessarily support it,
              // so something that returns a shared future should probably be a
              // primitive of the model somewhere...
  auto lo_done = p.then([ex,data,lo,comp](size_t p) {
    // this may need to include some sort of "unwrap" notation, depending on what
    // p1054 decides
    return par_qs_impl(ex, data, lo, p-1, comp);
  });
  auto hi_done = p.then([ex,data,hi,comp](size_t p) {
    // this may also need some sort of unwrap notation
    return par_qs_impl(ex, data, p+1, hi, comp);
  });
  // when_all is still part of unresolved additions to P1054,
  // so we'd have do do something like:
  return lo_done.then([hi_done=std::move(hi_done)(){
    // still need some sort of unwrap, possibly
    return hi_done;
  }]);
}

void sort(ParallelPolicyOn<Executor>&& policy, RandomIt first, RandomIt last, Compare comp) {
  if(first == last) return;
  std::this_thread::future_get(_par_qs_impl(policy.executor(), first, 0, last-first-1, comp));
}

Or another version that minimizes extra task creation at the expense of exposing less of the tasking structure to the executor:

// most of the same comments from the above version still apply
auto _par_qs_impl(Executor ex, RandomIt data, size_t lo, size_t hi, Compare comp) {
  if(lo - hi <= 1) {
    return execution::query(ex, ready_future_factory<void>).make_ready_future(); 
  }
  auto& pivot = data[hi];
  auto p = lo - 1; 
  for(size_t j = lo; j < hi; ++j) {
    if(comp(data[j], pivot)) std::swap(data[++p], data[j]);
  }
  std::swap(data[++p], data[hi]);
  auto lo_done = ex.twoway_execute([ex,data,lo,comp,p] {
    // still needs some unwrapping, except this time twoway needs to be involved
    return par_qs_impl(ex, data, lo, p-1, comp);
  });
  auto hi_done = par_qs_impl(ex, data, p+1, hi, comp);
  return lo_done.then([hi_done=std::move(hi_done)(){
    return hi_done;
  }]);
}

void sort(ParallelPolicyOn<Executor>&& policy, RandomIt first, RandomIt last, Compare comp) {
  if(first == last) return;
  std::this_thread::future_get(_par_qs_impl(policy.executor(), first, 0, last-first-1, comp));
}

P1055

Updated with input from Kirk:

auto _par_qs_impl(RandomIt data, size_t lo, size_t hi, Compare comp) {
  return [data,lo,hi,comp](auto ex){
    return mi::none_deferred{[ex,data,lo,hi,comp](auto out){
      if(lo - hi <= 1) {
        op::empty() | op::submit(out);
      } else {
        ex | 
        op::transform([](auto ex){
          auto& pivot = data[hi];
          auto p = lo - 1;
          for(size_t j = lo; j < hi; ++j) {
            if(comp(data[j], pivot)) std::swap(data[++p], data[j]);
          }
          std::swap(data[++p], data[hi]);
          return std::tuple{
            ex | _par_qs_impl(data, lo, p-1, comp), 
            ex | _par_qs_impl(data, p+1, hi, comp)}; 
        }) |
        op::zip(ex) | 
        op::submit(out);
      }
  };
}

void sort(ParallelPolicyOn<Executor>&& policy, RandomIt first, RandomIt last, Compare comp) {
  if(first == last) return;
  policy.executor() |
    _par_qs_impl(first, 0, last-first-1, comp) |
    op::blocking_submit();
}
@RedBeard0531
Copy link

I took a crack at writing up a sort using the API I proposed in https://gist.github.com/RedBeard0531/600d5389e1126d3ff395dbc87d07da3e. It is untested so there are probably bugs, but if it was written correctly I think it would have O(N) wall clock runtime if given sufficient parallelism.

int round_up_div(int a, int b) {
    return (a + (b - 1)) / b;
}

Future<void> par_merge_sort(BulkExec* exec, int* begin, int* end) {
    constexpr static size_t kSimpleSortSize = 16; // Just do a simple sort at and below this size.
    struct SimpleSortTask {
        int* begin;
        int* end;

        using Shape = int;
        Shape shape() {
            return round_up_div(end - begin, kSimpleSortSize);
        }

        void run(Shape i) {
            auto myBegin = begin + i * kSimpleSortSize;
            auto myEnd = std::min(end, myBegin + kSimpleSortSize);
            std::sort(myBegin, myEnd);
        }

        void finalize() {}
    };

    struct MergeTask {
        BulkExec* exec; 
        int* begin;
        int* end;
        int strideLen;

        using Shape = int;
        Shape shape() {
            return round_up_div(end - begin, strideLen * 2);
        }

        void run(Shape i) {
            auto myBegin = begin + i * (strideLen * 2);
            auto myMid = std::min(end, myBegin + strideLen);
            auto myEnd = std::min(end, myMid + strideLen);
            if (myMid == myEnd) return; // Already sorted.
            std::inplace_merge(myBegin, myMid, myEnd);
        }

        Future<void> finalize() {
            if (shape() <= 1)
                return Future<void>::makeReady();
            return exec->exec(MergeTask{exec, begin, end, strideLen * 2});
            // could also be `stridelen *= 2; exec(*this);` but I find clearer to make a new task.
        }
    };

    return exec->exec(SimpleSortTask{begin, end}).then([=] {
        return exec->exec(MergeTask{exec, begin, end, kSimpleSortSize});
    });
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment