Skip to content

Instantly share code, notes, and snippets.

@SAtacker

SAtacker/work.md Secret

Last active December 20, 2022 07:42
Show Gist options
  • Save SAtacker/38e8fc1c386fab3566f9e008bcf727e0 to your computer and use it in GitHub Desktop.
Save SAtacker/38e8fc1c386fab3566f9e008bcf727e0 to your computer and use it in GitHub Desktop.
HPX, GSoC 2022 work description

Table of contents generated with markdown-toc

1. GSoC 2022 Report

GSoC @ HPX, STE||AR Group banner GSoC @ HPX, STE||AR Group banner

2. About the project

GSoC Project Acceptance Page

Mentors

  • Dr. Hartmut Kaiser
  • Giannis Gonidelis

3. Objective

The main objective of this project was to enable the usage of co_await or to simplify - enable the usage of coroutines with the latest specs of Senders and receivers.

4. Abstract

HPX being up to date with Std C++ Proposals, Senders/Receivers were implemented as per P2300. But they have been missing coroutine (co_await) integration and minor functionalities as described in P2300 which is likely to be accepted. Hence I plan to implement these functionalities within the Core HPX Library.

  • Benefits:
    • Coroutines introduce better async code. For example, it is more readable, local variables have the same lifespan as the coroutine which means we don’t need to worry about allocation/release.
    • S/R algorithms can work with coroutines which they cannot as of now unless relied on futures which as mentioned are single-time use.
    • Adding co_await support makes the code more structured with respect to concurrency which can also be done by library abstractions of callbacks but using co_await may make it more optimized.

5. Brief Summary

  • Senders, and Receivers

    • Because it makes a more consistent programming model considering async programming types i.e. Parallelism and Concurrency. It standardizes the terminologies and execution policies which are more generic and reduce redundancy.
    • Coroutines have a direct connection between Senders and Coroutine Awaitables.
    • If you are familiar with Eric's talk on live senders/receivers, you might be comfortable watching these diagrams.
  • Futures

    • One of the points of S/R is to avoid the allocations associated with futures, also, futures are single-use, whereas S/R, in general, can be used (started) multiple times. - Dr. H. Kaiser

Goal is to enable all Sender CPOs to do the following:

  • If we write a sender and pass it to a function which could be a coroutine that could co_await that sender and get its result.
  • If they are not generally awaitable then we can await transform them (i.e. make them awaitable).

6. More Context on:

6.1. Coroutines

Quoting from the below references:

  • Some senders are awaitables
  • All awaitables are senders
  • When the coroutine is in a suspended state it basically becomes a callback function but we know callbacks are straightforward receivers, then coroutines are receivers and awaitables are senders.

6.2. Senders and Receivers

  • Senders are just a lazy value
  • Receivers are like callbacks which provide 3 slots
    • set_value
    • set_error
    • set_stopped
  • The relationship between these two is that the sender passes the values to the receiver which has 3 specific slots to report to.
  • A Scheduler is a handle where these computations will be performed. These produce senders which maybe accepted by the async algorithms.

7. Work

My PRs can be found using this link as it'll always be updated.

Following are the Merged PRs until now:

Minor Improvements:

  • [execution] Add schedule_result_t alias template
    • This was a warm-up PR to ensure that I can read the standard reference implementation and can write tests that are necessary.
  • [hpx::execution] Added forwarding_scheduler_query
    • I revised a lot of tag_* mathods espcially tag_invoke.
    • Found minor bug in the implementation for it and fixed it link
    • As far as the tests went it was pretty clear that we need to test the CPO using a custo implementation
  • [P2300] enhancements: receiver_of, sender_of improvements
    • A good revision of SFINAE helped me implement the following metafunctions is_invocable_variant_of_tuples, is_invocable_variant
    • Added sender_of concept that defines the requirements for a sender type that on successful completion sends the specified set of value types.
    • Added receiver_of concept takes a receiver and an instance of the completion_signatures<> class template.
    • Tons of tests for all the above.
  • [P2300] Added fundamental coroutine_traits for S/R
    • Firstly I had to modify the C++ compiler check for C++ 20 Coroutines functionality.
    • This led to an addition of a new hpx/config/coroutines_support.hpp header.
    • An awaiter concept - An Awaiter type is a type that implements the three special methods that are called as part of a co_await expression: await_ready, await_suspend and await_resume.
    • An awaitable concept - Something that you can apply the ‘co_await’ operator to. If the promise type defines an await_transform() member then the awaitable is obtained by calling promise.await_transform(value), passing the awaited value. Simply checks whether the type supports applying the co_await operator to avalue of that type. If the object has either a member or non-member operator co_await() then its return value must satisfy the Awaiter concept. Otherwise, the Awaitable object must satisfy the Awaiter concept itself.
    • Tons of tests as usual

Major Work:

  1. Adapt get_completion_signatures when Sender is a awaitable This confused me for a while actually. There was a minor bug in the P2300 5th Rev Specification. It did not consider the environment for awaitables i.e. it did not consider the promise type for awaitables. All those awaitables whose promise type defines await_transform would not be considered awaitables and there won't be any completion signatures for them. This led to a further discussion recently with Eric Niebler. I think that it will be revised even further. But for now we can deal with senders that expose a co_await operator.
  2. Utility as_awaitable_t
    • receiver_base, sender_awaitable_base
    • to transform an object into one that is awaitable within a particular coroutine.
  3. promise base for 5.
  4. operation base for 5.
  5. Utility connect_awaitable to adapt connect mentioned in spec 2.2
  6. Utility with_awaitable_senders
    • Used as the base class of a coroutine promise type, makes senders awaitable in that coroutine type

This is the PR for the above work.

8. Future work

  • Test these on all the algorithms that returns a sender.

9. Extra Contribution

  • I discovered a bug in the standard specification of P2300 which is filed as an issue and made a PR to the reference implementation for this issue is currently in progress. here

10. Example Code

template <typename T>
struct custom_task
{
    template <typename A>
    struct custom;
    using promise_type = custom<T>;

    custom_task() = default;

    custom_task(custom_task&& that) noexcept
      : coro_(std::exchange(that.coro_, {}))
    {
    }

    ~custom_task()
    {
        if (coro_)
            coro_.destroy();
    }

    explicit custom_task(
        hpx::coro::coroutine_handle<promise_type> __coro) noexcept
      : coro_(__coro)
    {
    }

    struct final_awaitable
    {
        static std::false_type await_ready() noexcept
        {
            return {};
        }
        static hpx::coro::coroutine_handle<> await_suspend(
            hpx::coro::coroutine_handle<promise_type> h) noexcept
        {
            return h.promise().continuation();
        }
        static void await_resume() noexcept {}
    };

    template <typename A>
    struct custom
      : hpx::execution::experimental::with_awaitable_senders<custom<T>>
    {
        custom_task get_return_object() noexcept
        {
            return custom_task(
                hpx::coro::coroutine_handle<promise_type>::from_promise(*this));
        }
        hpx::coro::suspend_always initial_suspend() noexcept
        {
            return {};
        }
        final_awaitable final_suspend() noexcept
        {
            return {};
        }
        void unhandled_exception() {}

        void return_value(T value) noexcept
        {
            data_.template emplace<1>(std::move(value));
        }
        std::variant<std::monostate, T, std::exception_ptr> data_{};

        custom_task context_;
    };

    template <typename ParentPromise = void>
    struct task_awaitable
    {
        hpx::coro::coroutine_handle<promise_type> coro_;
        std::optional<custom<ParentPromise>> context_{};

        static std::false_type await_ready() noexcept
        {
            return {};
        }
        template <typename ParentPromise2>
        hpx::coro::coroutine_handle<> await_suspend(
            hpx::coro::coroutine_handle<ParentPromise2> parent) noexcept
        {
            coro_.promise().set_continuation(parent);
            return coro_;
        }
        T await_resume()
        {
            context_.reset();
            if (coro_.promise().data_.index() == 2)
                std::rethrow_exception(
                    std::get<2>(std::move(coro_.promise().data_)));
            if constexpr (!std::is_void_v<T>)
                return std::get<1>(std::move(coro_.promise().data_));
        }
    };

    friend task_awaitable<> operator co_await(custom_task&& self) noexcept
    {
        return task_awaitable<>{std::exchange(self.coro_, {})};
    }

    template <typename ParentPromise>
    friend task_awaitable<ParentPromise> tag_invoke(
        hpx::execution::experimental::as_awaitable_t, custom_task&& self,
        ParentPromise&) noexcept
    {
        return task_awaitable<ParentPromise>{std::exchange(self.coro_, {})};
    }
    hpx::coro::coroutine_handle<promise_type> coro_;
};

template <typename S1,
    typename = std::enable_if_t<hpx::execution::experimental::is_sender_v<S1>>>
custom_task<int> async_answer_custom(S1 s1)
{
    // Senders are implicitly awaitable (in this coroutine type):
    co_return co_await (S1 &&) s1;
}
    try
    {
        // Awaitables are implicitly senders:
        auto i = hpx::this_thread::experimental::sync_wait(
            async_answer_custom(hpx::execution::experimental::just(42)))
                     .value();
        std::cout << "The answer is " << hpx::get<0>(i) << '\n';
    }
    catch (std::exception& e)
    {
        std::cout << e.what() << '\n';
    }

11. References

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment