Skip to content

Instantly share code, notes, and snippets.

@ashtum
Created July 7, 2023 06:25
Show Gist options
  • Save ashtum/19eb64eae51b150b4fc5086f9790c1dc to your computer and use it in GitHub Desktop.
Save ashtum/19eb64eae51b150b4fc5086f9790c1dc to your computer and use it in GitHub Desktop.
Asio Work Tracking Executor
#include <boost/asio.hpp>
#include <fmt/format.h>
namespace boost
{
namespace asio
{
namespace detail
{
class tracking_executor_service
: public execution_context_service_base<tracking_executor_service>
{
std::atomic<uint32_t> outstanding_works_{};
public:
tracking_executor_service(execution_context& ctx)
: execution_context_service_base<tracking_executor_service>(ctx)
{
}
void increment_outstanding_works() noexcept
{
outstanding_works_.fetch_add(1, std::memory_order_relaxed);
}
void decrement_outstanding_works() noexcept
{
outstanding_works_.fetch_sub(1, std::memory_order_relaxed);
}
uint32_t get_outstanding_work_count() const noexcept
{
return outstanding_works_.load(std::memory_order_relaxed);
}
void shutdown() noexcept override
{
}
};
} // namespace detail
template<typename Executor>
class tracking_executor
{
public:
typedef Executor inner_executor_type;
/// Construct a tracking_executor for the specified executor.
tracking_executor(
const Executor& ex,
detail::tracking_executor_service* service)
: executor_(ex)
, service_(service)
{
}
/// Copy constructor.
tracking_executor(const tracking_executor& other) noexcept
: executor_(other.executor_)
, service_(other.service_)
{
}
/// Converting constructor.
template<class OtherExecutor>
tracking_executor(const tracking_executor<OtherExecutor>& other) noexcept
: executor_(other.executor_)
, service_(other.service_)
{
}
/// Assignment operator.
tracking_executor& operator=(const tracking_executor& other) noexcept
{
executor_ = other.executor_;
service_ = other.service_;
return *this;
}
/// Converting assignment operator.
template<class OtherExecutor>
tracking_executor& operator=(
const tracking_executor<OtherExecutor>& other) noexcept
{
executor_ = other.executor_;
service_ = other.service_;
return *this;
}
/// Move constructor.
tracking_executor(tracking_executor&& other) noexcept
: executor_(std::move(other.executor_))
, service_(std::move(other.service_))
{
}
/// Converting move constructor.
template<class OtherExecutor>
tracking_executor(tracking_executor<OtherExecutor>&& other) noexcept
: executor_(std::move(other.executor_))
, service_(std::move(other.service_))
{
}
/// Move assignment operator.
tracking_executor& operator=(tracking_executor&& other) noexcept
{
executor_ = std::move(other.executor_);
service_ = std::move(other.service_);
return *this;
}
/// Converting move assignment operator.
template<class OtherExecutor>
tracking_executor& operator=(
tracking_executor<OtherExecutor>&& other) noexcept
{
executor_ = std::move(other.executor_);
service_ = std::move(other.service_);
return *this;
}
/// Destructor.
~tracking_executor() noexcept
{
}
/// Obtain the underlying executor.
inner_executor_type get_inner_executor() const noexcept
{
return executor_;
}
/// Forward a query to the underlying executor.
template<typename Property>
typename constraint<
can_query<const Executor&, Property>::value,
typename conditional<
is_convertible<Property, execution::blocking_t>::value,
execution::blocking_t,
typename query_result<const Executor&, Property>::type>::type>::type
query(const Property& p) const
noexcept((is_nothrow_query<const Executor&, Property>::value))
{
return this->query_helper(
is_convertible<Property, execution::blocking_t>(), p);
}
/// Forward a requirement to the underlying executor.
template<typename Property>
typename constraint<
can_require<const Executor&, Property>::value &&
!is_convertible<Property, execution::blocking_t::always_t>::value,
tracking_executor<typename decay<
typename require_result<const Executor&, Property>::type>::type>>::
type
require(const Property& p) const
noexcept((is_nothrow_require<const Executor&, Property>::value))
{
return tracking_executor<typename decay<
typename require_result<const Executor&, Property>::type>::type>(
boost::asio::require(executor_, p), service_);
}
/// Forward a preference to the underlying executor.
template<typename Property>
typename constraint<
can_prefer<const Executor&, Property>::value &&
!is_convertible<Property, execution::blocking_t::always_t>::value,
tracking_executor<typename decay<
typename prefer_result<const Executor&, Property>::type>::type>>::
type
prefer(const Property& p) const
noexcept((is_nothrow_prefer<const Executor&, Property>::value))
{
return tracking_executor<typename decay<
typename prefer_result<const Executor&, Property>::type>::type>(
boost::asio::prefer(executor_, p), service_);
}
/// Obtain the underlying execution context.
execution_context& context() const noexcept
{
return executor_.context();
}
/// Inform the tracking_executor that it has some outstanding work to do.
void on_work_started() const noexcept
{
executor_.on_work_started();
}
/// Inform the tracking_executor that some work is no longer outstanding.
void on_work_finished() const noexcept
{
executor_.on_work_finished();
}
/// Request the tracking_executor to invoke the given function object.
template<typename Function>
typename constraint<
execution::can_execute<const Executor&, Function>::value,
void>::type
execute(Function&& f) const
{
service_->increment_outstanding_works();
executor_.execute(
[s = service_, f = std::move(f)]() mutable
{
s->decrement_outstanding_works();
f();
});
}
/// Request the tracking_executor to invoke the given function object.
template<typename Function, typename Allocator>
void dispatch(Function&& f, const Allocator& a) const
{
service_->increment_outstanding_works();
executor_.dispatch(
[s = service_, f = std::move(f)]() mutable
{
s->decrement_outstanding_works();
f();
},
a);
}
/// Request the tracking_executor to invoke the given function object.
template<typename Function, typename Allocator>
void post(Function&& f, const Allocator& a) const
{
service_->increment_outstanding_works();
executor_.post(
[s = service_, f = std::move(f)]() mutable
{
s->decrement_outstanding_works();
f();
},
a);
}
/// Request the tracking_executor to invoke the given function object.
template<typename Function, typename Allocator>
void defer(Function&& f, const Allocator& a) const
{
service_->increment_outstanding_works();
executor_.defer(
[s = service_, f = std::move(f)]() mutable
{
s->decrement_outstanding_works();
f();
},
a);
}
/// Determine whether the tracking_executor is running in the current
bool running_in_this_thread() const noexcept
{
return executor_.running_in_this_thread();
}
/// Compare two tracking_executors for equality.
friend bool operator==(
const tracking_executor& a,
const tracking_executor& b) noexcept
{
return a.service_ == b.service_;
}
/// Compare two tracking_executors for inequality.
friend bool operator!=(
const tracking_executor& a,
const tracking_executor& b) noexcept
{
return a.service_ != b.service_;
}
private:
template<typename Property>
typename query_result<const Executor&, Property>::type query_helper(
false_type,
const Property& property) const
{
return boost::asio::query(executor_, property);
}
template<typename Property>
execution::blocking_t query_helper(true_type, const Property& property)
const
{
execution::blocking_t result = boost::asio::query(executor_, property);
return result == execution::blocking.always
? execution::blocking.possibly
: result;
}
Executor executor_;
detail::tracking_executor_service* service_;
};
template<typename Executor>
tracking_executor<Executor> make_tracking_executor(
const Executor& ex,
typename constraint<
is_executor<Executor>::value ||
execution::is_executor<Executor>::value>::type = 0)
{
return tracking_executor<Executor>(
ex,
&use_service<detail::tracking_executor_service>(
boost::asio::query(ex, execution::context)));
}
template<typename ExecutionContext>
tracking_executor<typename ExecutionContext::executor_type>
make_tracking_executor(
ExecutionContext& ctx,
typename constraint<
is_convertible<ExecutionContext&, execution_context&>::value>::type = 0)
{
return tracking_executor<typename ExecutionContext::executor_type>(
ctx.get_executor(),
&use_service<detail::tracking_executor_service>(ctx));
}
template<typename Executor>
uint32_t get_outstanding_work_count(
const Executor& ex,
typename constraint<
is_executor<Executor>::value ||
execution::is_executor<Executor>::value>::type = 0)
{
return use_service<detail::tracking_executor_service>(
boost::asio::query(ex, execution::context))
.get_outstanding_work_count();
}
template<typename ExecutionContext>
uint32_t get_outstanding_work_count(
ExecutionContext& ctx,
typename constraint<
is_convertible<ExecutionContext&, execution_context&>::value>::type = 0)
{
return use_service<detail::tracking_executor_service>(ctx)
.get_outstanding_work_count();
}
} // namespace asio
} // namespace boost
namespace asio = boost::asio;
asio::awaitable<void> async_main()
{
auto exec = co_await asio::this_coro::executor;
auto timer = asio::steady_timer{ exec };
for (auto i = 0; i < 10; i++)
{
for (auto j = 0; j < 1000; j++)
asio::post(
exec,
[]
{
// simulates a computationally heavy task
std::this_thread::sleep_for(std::chrono::microseconds{ 1 });
});
fmt::print("Outstanding works:{}\n", get_outstanding_work_count(exec));
timer.expires_after(std::chrono::milliseconds{ 100 });
co_await timer.async_wait(asio::deferred);
}
co_return;
}
int main()
{
asio::thread_pool tp{ 8 };
auto tracking_exec = asio::make_tracking_executor(tp);
asio::co_spawn(tracking_exec, async_main(), asio::detached);
tp.join();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment