Created
July 7, 2023 06:25
-
-
Save ashtum/19eb64eae51b150b4fc5086f9790c1dc to your computer and use it in GitHub Desktop.
Asio Work Tracking Executor
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <boost/asio.hpp> | |
#include <fmt/format.h> | |
namespace boost | |
{ | |
namespace asio | |
{ | |
namespace detail | |
{ | |
class tracking_executor_service | |
: public execution_context_service_base<tracking_executor_service> | |
{ | |
std::atomic<uint32_t> outstanding_works_{}; | |
public: | |
tracking_executor_service(execution_context& ctx) | |
: execution_context_service_base<tracking_executor_service>(ctx) | |
{ | |
} | |
void increment_outstanding_works() noexcept | |
{ | |
outstanding_works_.fetch_add(1, std::memory_order_relaxed); | |
} | |
void decrement_outstanding_works() noexcept | |
{ | |
outstanding_works_.fetch_sub(1, std::memory_order_relaxed); | |
} | |
uint32_t get_outstanding_work_count() const noexcept | |
{ | |
return outstanding_works_.load(std::memory_order_relaxed); | |
} | |
void shutdown() noexcept override | |
{ | |
} | |
}; | |
} // namespace detail | |
template<typename Executor> | |
class tracking_executor | |
{ | |
public: | |
typedef Executor inner_executor_type; | |
/// Construct a tracking_executor for the specified executor. | |
tracking_executor( | |
const Executor& ex, | |
detail::tracking_executor_service* service) | |
: executor_(ex) | |
, service_(service) | |
{ | |
} | |
/// Copy constructor. | |
tracking_executor(const tracking_executor& other) noexcept | |
: executor_(other.executor_) | |
, service_(other.service_) | |
{ | |
} | |
/// Converting constructor. | |
template<class OtherExecutor> | |
tracking_executor(const tracking_executor<OtherExecutor>& other) noexcept | |
: executor_(other.executor_) | |
, service_(other.service_) | |
{ | |
} | |
/// Assignment operator. | |
tracking_executor& operator=(const tracking_executor& other) noexcept | |
{ | |
executor_ = other.executor_; | |
service_ = other.service_; | |
return *this; | |
} | |
/// Converting assignment operator. | |
template<class OtherExecutor> | |
tracking_executor& operator=( | |
const tracking_executor<OtherExecutor>& other) noexcept | |
{ | |
executor_ = other.executor_; | |
service_ = other.service_; | |
return *this; | |
} | |
/// Move constructor. | |
tracking_executor(tracking_executor&& other) noexcept | |
: executor_(std::move(other.executor_)) | |
, service_(std::move(other.service_)) | |
{ | |
} | |
/// Converting move constructor. | |
template<class OtherExecutor> | |
tracking_executor(tracking_executor<OtherExecutor>&& other) noexcept | |
: executor_(std::move(other.executor_)) | |
, service_(std::move(other.service_)) | |
{ | |
} | |
/// Move assignment operator. | |
tracking_executor& operator=(tracking_executor&& other) noexcept | |
{ | |
executor_ = std::move(other.executor_); | |
service_ = std::move(other.service_); | |
return *this; | |
} | |
/// Converting move assignment operator. | |
template<class OtherExecutor> | |
tracking_executor& operator=( | |
tracking_executor<OtherExecutor>&& other) noexcept | |
{ | |
executor_ = std::move(other.executor_); | |
service_ = std::move(other.service_); | |
return *this; | |
} | |
/// Destructor. | |
~tracking_executor() noexcept | |
{ | |
} | |
/// Obtain the underlying executor. | |
inner_executor_type get_inner_executor() const noexcept | |
{ | |
return executor_; | |
} | |
/// Forward a query to the underlying executor. | |
template<typename Property> | |
typename constraint< | |
can_query<const Executor&, Property>::value, | |
typename conditional< | |
is_convertible<Property, execution::blocking_t>::value, | |
execution::blocking_t, | |
typename query_result<const Executor&, Property>::type>::type>::type | |
query(const Property& p) const | |
noexcept((is_nothrow_query<const Executor&, Property>::value)) | |
{ | |
return this->query_helper( | |
is_convertible<Property, execution::blocking_t>(), p); | |
} | |
/// Forward a requirement to the underlying executor. | |
template<typename Property> | |
typename constraint< | |
can_require<const Executor&, Property>::value && | |
!is_convertible<Property, execution::blocking_t::always_t>::value, | |
tracking_executor<typename decay< | |
typename require_result<const Executor&, Property>::type>::type>>:: | |
type | |
require(const Property& p) const | |
noexcept((is_nothrow_require<const Executor&, Property>::value)) | |
{ | |
return tracking_executor<typename decay< | |
typename require_result<const Executor&, Property>::type>::type>( | |
boost::asio::require(executor_, p), service_); | |
} | |
/// Forward a preference to the underlying executor. | |
template<typename Property> | |
typename constraint< | |
can_prefer<const Executor&, Property>::value && | |
!is_convertible<Property, execution::blocking_t::always_t>::value, | |
tracking_executor<typename decay< | |
typename prefer_result<const Executor&, Property>::type>::type>>:: | |
type | |
prefer(const Property& p) const | |
noexcept((is_nothrow_prefer<const Executor&, Property>::value)) | |
{ | |
return tracking_executor<typename decay< | |
typename prefer_result<const Executor&, Property>::type>::type>( | |
boost::asio::prefer(executor_, p), service_); | |
} | |
/// Obtain the underlying execution context. | |
execution_context& context() const noexcept | |
{ | |
return executor_.context(); | |
} | |
/// Inform the tracking_executor that it has some outstanding work to do. | |
void on_work_started() const noexcept | |
{ | |
executor_.on_work_started(); | |
} | |
/// Inform the tracking_executor that some work is no longer outstanding. | |
void on_work_finished() const noexcept | |
{ | |
executor_.on_work_finished(); | |
} | |
/// Request the tracking_executor to invoke the given function object. | |
template<typename Function> | |
typename constraint< | |
execution::can_execute<const Executor&, Function>::value, | |
void>::type | |
execute(Function&& f) const | |
{ | |
service_->increment_outstanding_works(); | |
executor_.execute( | |
[s = service_, f = std::move(f)]() mutable | |
{ | |
s->decrement_outstanding_works(); | |
f(); | |
}); | |
} | |
/// Request the tracking_executor to invoke the given function object. | |
template<typename Function, typename Allocator> | |
void dispatch(Function&& f, const Allocator& a) const | |
{ | |
service_->increment_outstanding_works(); | |
executor_.dispatch( | |
[s = service_, f = std::move(f)]() mutable | |
{ | |
s->decrement_outstanding_works(); | |
f(); | |
}, | |
a); | |
} | |
/// Request the tracking_executor to invoke the given function object. | |
template<typename Function, typename Allocator> | |
void post(Function&& f, const Allocator& a) const | |
{ | |
service_->increment_outstanding_works(); | |
executor_.post( | |
[s = service_, f = std::move(f)]() mutable | |
{ | |
s->decrement_outstanding_works(); | |
f(); | |
}, | |
a); | |
} | |
/// Request the tracking_executor to invoke the given function object. | |
template<typename Function, typename Allocator> | |
void defer(Function&& f, const Allocator& a) const | |
{ | |
service_->increment_outstanding_works(); | |
executor_.defer( | |
[s = service_, f = std::move(f)]() mutable | |
{ | |
s->decrement_outstanding_works(); | |
f(); | |
}, | |
a); | |
} | |
/// Determine whether the tracking_executor is running in the current | |
bool running_in_this_thread() const noexcept | |
{ | |
return executor_.running_in_this_thread(); | |
} | |
/// Compare two tracking_executors for equality. | |
friend bool operator==( | |
const tracking_executor& a, | |
const tracking_executor& b) noexcept | |
{ | |
return a.service_ == b.service_; | |
} | |
/// Compare two tracking_executors for inequality. | |
friend bool operator!=( | |
const tracking_executor& a, | |
const tracking_executor& b) noexcept | |
{ | |
return a.service_ != b.service_; | |
} | |
private: | |
template<typename Property> | |
typename query_result<const Executor&, Property>::type query_helper( | |
false_type, | |
const Property& property) const | |
{ | |
return boost::asio::query(executor_, property); | |
} | |
template<typename Property> | |
execution::blocking_t query_helper(true_type, const Property& property) | |
const | |
{ | |
execution::blocking_t result = boost::asio::query(executor_, property); | |
return result == execution::blocking.always | |
? execution::blocking.possibly | |
: result; | |
} | |
Executor executor_; | |
detail::tracking_executor_service* service_; | |
}; | |
template<typename Executor> | |
tracking_executor<Executor> make_tracking_executor( | |
const Executor& ex, | |
typename constraint< | |
is_executor<Executor>::value || | |
execution::is_executor<Executor>::value>::type = 0) | |
{ | |
return tracking_executor<Executor>( | |
ex, | |
&use_service<detail::tracking_executor_service>( | |
boost::asio::query(ex, execution::context))); | |
} | |
template<typename ExecutionContext> | |
tracking_executor<typename ExecutionContext::executor_type> | |
make_tracking_executor( | |
ExecutionContext& ctx, | |
typename constraint< | |
is_convertible<ExecutionContext&, execution_context&>::value>::type = 0) | |
{ | |
return tracking_executor<typename ExecutionContext::executor_type>( | |
ctx.get_executor(), | |
&use_service<detail::tracking_executor_service>(ctx)); | |
} | |
template<typename Executor> | |
uint32_t get_outstanding_work_count( | |
const Executor& ex, | |
typename constraint< | |
is_executor<Executor>::value || | |
execution::is_executor<Executor>::value>::type = 0) | |
{ | |
return use_service<detail::tracking_executor_service>( | |
boost::asio::query(ex, execution::context)) | |
.get_outstanding_work_count(); | |
} | |
template<typename ExecutionContext> | |
uint32_t get_outstanding_work_count( | |
ExecutionContext& ctx, | |
typename constraint< | |
is_convertible<ExecutionContext&, execution_context&>::value>::type = 0) | |
{ | |
return use_service<detail::tracking_executor_service>(ctx) | |
.get_outstanding_work_count(); | |
} | |
} // namespace asio | |
} // namespace boost | |
namespace asio = boost::asio; | |
asio::awaitable<void> async_main() | |
{ | |
auto exec = co_await asio::this_coro::executor; | |
auto timer = asio::steady_timer{ exec }; | |
for (auto i = 0; i < 10; i++) | |
{ | |
for (auto j = 0; j < 1000; j++) | |
asio::post( | |
exec, | |
[] | |
{ | |
// simulates a computationally heavy task | |
std::this_thread::sleep_for(std::chrono::microseconds{ 1 }); | |
}); | |
fmt::print("Outstanding works:{}\n", get_outstanding_work_count(exec)); | |
timer.expires_after(std::chrono::milliseconds{ 100 }); | |
co_await timer.async_wait(asio::deferred); | |
} | |
co_return; | |
} | |
int main() | |
{ | |
asio::thread_pool tp{ 8 }; | |
auto tracking_exec = asio::make_tracking_executor(tp); | |
asio::co_spawn(tracking_exec, async_main(), asio::detached); | |
tp.join(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment