Skip to content

Instantly share code, notes, and snippets.

@dhollman
Last active July 20, 2018 13:59
Show Gist options
  • Save dhollman/0cb3cedc55b198aa66443eda419331b3 to your computer and use it in GitHub Desktop.
Save dhollman/0cb3cedc55b198aa66443eda419331b3 to your computer and use it in GitHub Desktop.
Standard Algorithms with Executors

Standard Algorithms Interaction with Different Executors Proposals

Disclaimers

Herein I'll use some hand-wavy implementation of the widely-suggested policy.on(executor) where, for instance, par.on(ex) returns something that matches a concept called ParallelPolicyOn<Executor>. I'll also be hand-wavy about a bunch of other things (e.g., ignoring the existence of ranges), but if they become a distraction (or if someone thinks something I've ignored is relevant) I'll be happy to change them. I'm also using compact concept notation for brevity.

An exploration of std::sort

P0443 + P1054

Oversimplified, strawman implementation of std::sort using P0443+P1054. The second version, below this, is more analogous to what I've tried (but failed) to write using P1055-like semantics.

P0443+P1054 Authors: Is there another way to express some of the hand-wavy "missing" stuff here using what we have?

// assumptions: swap and/or comp may be expensive compared to task creation,
// in which case we do not want to be lazy (or don't want to be lazy until
// a certain depth is reached) in order to take advantage of available parallelism
auto _par_qs_impl(Executor ex, RandomIt data, size_t lo, size_t hi, Compare comp) {
  if(lo - hi <= 1) {
    // P0443+P1054 needs something like this:
    return execution::query(ex, ready_future_factory<void>).make_ready_future(); 
  }
  auto p = ex.twoway_execute([data,lo,hi,comp]{
    // copied from Wikipedia, because I didn't feel like thinking about it
    auto& pivot = data[hi];
    auto i = lo - 1; 
    for(size_t j = lo; j < hi; ++j) {
      if(comp(data[j], pivot)) std::swap(data[++i], data[j]);
    }
    std::swap(data[i+1], data[hi]);
    return i + 1;
  }).share(); // share is hand-wavy; not all futures will necessarily support it,
              // so something that returns a shared future should probably be a
              // primitive of the model somewhere...
  auto lo_done = p.then([ex,data,lo,comp](size_t p) {
    // this may need to include some sort of "unwrap" notation, depending on what
    // p1054 decides
    return par_qs_impl(ex, data, lo, p-1, comp);
  });
  auto hi_done = p.then([ex,data,hi,comp](size_t p) {
    // this may also need some sort of unwrap notation
    return par_qs_impl(ex, data, p+1, hi, comp);
  });
  // when_all is still part of unresolved additions to P1054,
  // so we'd have do do something like:
  return lo_done.then([hi_done=std::move(hi_done)(){
    // still need some sort of unwrap, possibly
    return hi_done;
  }]);
}

void sort(ParallelPolicyOn<Executor>&& policy, RandomIt first, RandomIt last, Compare comp) {
  if(first == last) return;
  std::this_thread::future_get(_par_qs_impl(policy.executor(), first, 0, last-first-1, comp));
}

Or another version that minimizes extra task creation at the expense of exposing less of the tasking structure to the executor:

// most of the same comments from the above version still apply
auto _par_qs_impl(Executor ex, RandomIt data, size_t lo, size_t hi, Compare comp) {
  if(lo - hi <= 1) {
    return execution::query(ex, ready_future_factory<void>).make_ready_future(); 
  }
  auto& pivot = data[hi];
  auto p = lo - 1; 
  for(size_t j = lo; j < hi; ++j) {
    if(comp(data[j], pivot)) std::swap(data[++p], data[j]);
  }
  std::swap(data[++p], data[hi]);
  auto lo_done = ex.twoway_execute([ex,data,lo,comp,p] {
    // still needs some unwrapping, except this time twoway needs to be involved
    return par_qs_impl(ex, data, lo, p-1, comp);
  });
  auto hi_done = par_qs_impl(ex, data, p+1, hi, comp);
  return lo_done.then([hi_done=std::move(hi_done)(){
    return hi_done;
  }]);
}

void sort(ParallelPolicyOn<Executor>&& policy, RandomIt first, RandomIt last, Compare comp) {
  if(first == last) return;
  std::this_thread::future_get(_par_qs_impl(policy.executor(), first, 0, last-first-1, comp));
}

P1055

Updated with input from Kirk:

auto _par_qs_impl(RandomIt data, size_t lo, size_t hi, Compare comp) {
  return [data,lo,hi,comp](auto ex){
    return mi::none_deferred{[ex,data,lo,hi,comp](auto out){
      if(lo - hi <= 1) {
        op::empty() | op::submit(out);
      } else {
        ex | 
        op::transform([](auto ex){
          auto& pivot = data[hi];
          auto p = lo - 1;
          for(size_t j = lo; j < hi; ++j) {
            if(comp(data[j], pivot)) std::swap(data[++p], data[j]);
          }
          std::swap(data[++p], data[hi]);
          return std::tuple{
            ex | _par_qs_impl(data, lo, p-1, comp), 
            ex | _par_qs_impl(data, p+1, hi, comp)}; 
        }) |
        op::zip(ex) | 
        op::submit(out);
      }
  };
}

void sort(ParallelPolicyOn<Executor>&& policy, RandomIt first, RandomIt last, Compare comp) {
  if(first == last) return;
  policy.executor() |
    _par_qs_impl(first, 0, last-first-1, comp) |
    op::blocking_submit();
}
@kirkshoop
Copy link

kirkshoop commented Jul 18, 2018

P1055 sort

Here is a P1055 implementation (not compiler verified)

are the operations in the tuple passed to just sequenced or concurrent?

it depends on the executor used. if the executor is concurrent, then the operations are concurrent.

(updated to move pivot and p to correct scope)
(updated to apply David's changes and add no schedule for hi)

schedule Lo & hi

auto _par_qs_impl(RandomIt data, size_t lo, size_t hi, Compare comp) {
  return [data,lo,hi,comp](auto ex){
    return mi::none_deferred{[ex,data,lo,hi,comp](auto out){
      if(lo - hi <= 1) {
        op::empty() | op::submit(out);
      } else {
        ex | 
        op::transform([](auto ex){
          auto& pivot = data[hi];
          auto p = lo - 1;
          for(size_t j = lo; j < hi; ++j) {
            if(comp(data[j], pivot)) std::swap(data[++p], data[j]);
          }
          std::swap(data[++p], data[hi]);
          return std::tuple{
            ex | _par_qs_impl(data, lo, p-1, comp), 
            ex | _par_qs_impl(data, p+1, hi, comp)}; 
        }) |
        op::zip(ex) | 
        op::submit(out);
      }
  };
}

void sort(ParallelPolicyOn<Executor>&& policy, RandomIt first, RandomIt last, Compare comp) {
  if(first == last) return;
  policy.executor() |
    _par_qs_impl(first, 0, last-first-1, comp) |
    op::blocking_submit();
}

schedule Lo only

auto _par_qs_impl(RandomIt data, size_t lo, size_t hi, Compare comp) {
  return [data,lo,hi,comp](auto ex){
    return mi::none_deferred{[ex,data,lo,hi,comp](auto out){
      if(lo - hi <= 1) {
        op::empty() | op::submit(out);
      } else {
        ex | 
        op::transform([](auto ex){
          auto& pivot = data[hi];
          auto p = lo - 1;
          for(size_t j = lo; j < hi; ++j) {
            if(comp(data[j], pivot)) std::swap(data[++p], data[j]);
          }
          std::swap(data[++p], data[hi]);
          return std::tuple{
            ex | _par_qs_impl(data, lo, p-1, comp), 
            inline_executor{} | _par_qs_impl(data, p+1, hi, comp)}; 
        }) |
        op::zip(ex) | 
        op::submit(out);
      }
  };
}

void sort(ParallelPolicyOn<Executor>&& policy, RandomIt first, RandomIt last, Compare comp) {
  if(first == last) return;
  policy.executor() |
    _par_qs_impl(first, 0, last-first-1, comp) |
    op::blocking_submit();
}

@dhollman
Copy link
Author

I don't know for sure, but I think this has to be wrong. The recursive calls to _par_qs_impl() need to depend on the result of the pivot computation (that's where the arguments p-1 and p+1 come from.

I have more questions too, but let me think about them in the context of whatever changes this comment induces.

@kirkshoop
Copy link

yep, I got that wrong, move them to enclosing scope, better?

@dhollman
Copy link
Author

umm... no... The value of p after the pivot computation needs to be used; moving the whole pivot computation to the enclosing scope would do it:

auto _par_qs_impl(RandomIt data, size_t lo, size_t hi, Compare comp) {
  return [data,lo,hi,comp](auto ex){
    return mi::none_deferred{[ex,data,lo,hi,comp](auto out){
      if(lo - hi <= 1) {
        op::empty() | op::submit(out);
      } else {
        auto& pivot = data[hi];
        auto p = lo - 1;
        for(size_t j = lo; j < hi; ++j) {
          if(comp(data[j], pivot)) std::swap(data[++p], data[j]);
        }
        std::swap(data[++p], data[hi]);
        op::just(std::tuple{
          ex | _par_qs_impl(data, lo, p-1, comp), 
          ex | _par_qs_impl(data, p+1, hi, comp)}) |
        op::zip(ex) | 
        op::submit(out);
      }
  };
}

void sort(ParallelPolicyOn<Executor>&& policy, RandomIt first, RandomIt last, Compare comp) {
  if(first == last) return;
  policy.executor() |
    _par_qs_impl(first, 0, last-first-1, comp) |
    op::blocking_submit();
}

But this kind of raises another question: are the operations in the tuple passed to just sequenced or concurrent? Also, what does this look like with the second approach that avoids creating an extra execution agent for the high part, as in the second code snippet from the 443 section of the gist?

@dhollman
Copy link
Author

Okay, a few more questions:


policy.executor() | _par_qs_impl(first, 0, last-first-1, comp) | op::blocking_submit();

Why isn't op::transform() used here for the middle part of the pipeline?


For the schedule lo only version, could I also have written it like this?

void _par_qs_impl(Executor ex, RandomIt data, size_t lo, size_t hi, Compare comp, auto out) {
  if(lo - hi <= 1) {
    op::empty() | op::submit(out);
  } else {
    auto& pivot = data[hi];
    auto p = lo - 1;
    for(size_t j = lo; j < hi; ++j) {
      if(comp(data[j], pivot)) std::swap(data[++p], data[j]);
    }
    std::swap(data[++p], data[hi]);

    auto lo_part = mi::none_deferred{[ex,data,lo,p,comp](auto out) {
      _par_qs_impl(ex, data, lo, p-1, comp, out);
    }}; 

    _par_qs_impl(ex, data, p+1, hi, comp, out);

    lo_part | op::submit(ex);
  }
}

void sort(ParallelPolicyOn<Executor>&& policy, RandomIt first, RandomIt last, Compare comp) {
  if(first == last) return;
  policy.executor() |
    mi::none_deferred{[ex=policy.executor(),first,last,comp](auto out){
      _par_qs_impl(ex, first, 0, last-first-1, comp, out);
    }} |
    op::blocking_submit();
}

I'm just trying to wrap my mind around what it looks like to continue using the same stack frame after scheduling a deferred operation (whether eagerly or lazily).


Finally, for the lazy lo and hi version, what are the implications for the stack? If submit(out) is where all of the work happens or gets scheduled, wouldn't this lead to stack overflow issues? (I'm pretty sure there are also some Executors in the 443 version of this that also have that problem, but I don't think it's as likely to be a problem "by default" so to speak).

@kirkshoop
Copy link

kirkshoop commented Jul 18, 2018

(updated executor usage discussion #2)


Why isn't op::transform() used here for the middle part of the pipeline?

_par_qs_impl is an adaptor/operator not a value transform function. (it returns a function that takes an sender/executor/future and returns an sender/executor/future)


For the schedule lo only version, could I also have written it like this?

not quite.

tldr with changes this appears to work, but I would not expect it to pass code review. This breaks up the expression of the dependency graph in ways that make it hard to reason about even when you know how the concepts and adaptors/operators interact.

  policy.executor() |
    mi::none_deferred{[ex=policy.executor(),first,last,comp](auto out){
      _par_qs_impl(ex, first, 0, last-first-1, comp, out);
    }} |
    op::blocking_submit();

none_deferred is not an adaptor it is a sender/executor/future. it could be used in place of policy.executor() but pipe cannot compose two senders. try:

    mi::none_deferred{[ex=policy.executor(),first,last,comp](auto out){
      _par_qs_impl(ex, first, 0, last-first-1, comp, out);
    }} |
    op::blocking_submit();
lo_part | op::submit(ex);

submit takes a receiver/promise but ex is a sender/executor/future. this will not use the executor to schedule the lo part. try:

(updated this code example)

ex | submit([](auto){lo_part | op::submit()});

If submit(out) is where all of the work happens or gets scheduled, wouldn't this lead to stack overflow issues?

if the executor is concurrent, there are no stack overflow issues.
if the executor is inline (and does not trampoline), yes, there are stack overflow issues.
trampoline is a basic building block for executors. (it makes inline into concurrent)

@RedBeard0531
Copy link

I took a crack at writing up a sort using the API I proposed in https://gist.github.com/RedBeard0531/600d5389e1126d3ff395dbc87d07da3e. It is untested so there are probably bugs, but if it was written correctly I think it would have O(N) wall clock runtime if given sufficient parallelism.

int round_up_div(int a, int b) {
    return (a + (b - 1)) / b;
}

Future<void> par_merge_sort(BulkExec* exec, int* begin, int* end) {
    constexpr static size_t kSimpleSortSize = 16; // Just do a simple sort at and below this size.
    struct SimpleSortTask {
        int* begin;
        int* end;

        using Shape = int;
        Shape shape() {
            return round_up_div(end - begin, kSimpleSortSize);
        }

        void run(Shape i) {
            auto myBegin = begin + i * kSimpleSortSize;
            auto myEnd = std::min(end, myBegin + kSimpleSortSize);
            std::sort(myBegin, myEnd);
        }

        void finalize() {}
    };

    struct MergeTask {
        BulkExec* exec; 
        int* begin;
        int* end;
        int strideLen;

        using Shape = int;
        Shape shape() {
            return round_up_div(end - begin, strideLen * 2);
        }

        void run(Shape i) {
            auto myBegin = begin + i * (strideLen * 2);
            auto myMid = std::min(end, myBegin + strideLen);
            auto myEnd = std::min(end, myMid + strideLen);
            if (myMid == myEnd) return; // Already sorted.
            std::inplace_merge(myBegin, myMid, myEnd);
        }

        Future<void> finalize() {
            if (shape() <= 1)
                return Future<void>::makeReady();
            return exec->exec(MergeTask{exec, begin, end, strideLen * 2});
            // could also be `stridelen *= 2; exec(*this);` but I find clearer to make a new task.
        }
    };

    return exec->exec(SimpleSortTask{begin, end}).then([=] {
        return exec->exec(MergeTask{exec, begin, end, kSimpleSortSize});
    });
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment