Skip to content

Instantly share code, notes, and snippets.

@JuanDiegoMontoya
Created December 22, 2023 12:43
Show Gist options
  • Save JuanDiegoMontoya/1b5825e217f4886fb94807cd129e33c4 to your computer and use it in GitHub Desktop.
Save JuanDiegoMontoya/1b5825e217f4886fb94807cd129e33c4 to your computer and use it in GitHub Desktop.
test of sync primitive for libcoro: this semaphore can be incremented and decremented by an arbitrary amount (instead of just 1), which makes it useful for rate-limiting resources
#include <coro/coro.hpp>
namespace test
{
class semaphore
{
public:
enum class acquire_result
{
acquired,
semaphore_stopped
};
static std::string acquire_result_acquired;
static std::string acquire_result_semaphore_stopped;
static std::string acquire_result_unknown;
static auto to_string(acquire_result ar) -> const std::string&
{
switch (ar)
{
case acquire_result::acquired: return acquire_result_acquired;
case acquire_result::semaphore_stopped: return acquire_result_semaphore_stopped;
}
return acquire_result_unknown;
}
explicit semaphore(std::ptrdiff_t least_max_value_and_starting_value);
explicit semaphore(std::ptrdiff_t least_max_value, std::ptrdiff_t starting_value);
~semaphore();
semaphore(const semaphore&) = delete;
semaphore(semaphore&&) = delete;
auto operator=(const semaphore&) noexcept -> semaphore& = delete;
auto operator=(semaphore&&) noexcept -> semaphore& = delete;
class acquire_operation
{
public:
explicit acquire_operation(semaphore& s, ptrdiff_t count);
auto await_ready() const noexcept -> bool;
auto await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool;
auto await_resume() const -> acquire_result;
private:
friend semaphore;
semaphore& m_semaphore;
ptrdiff_t m_count;
std::coroutine_handle<> m_awaiting_coroutine;
acquire_operation* m_next{nullptr};
};
auto release(ptrdiff_t count) -> void;
/**
* Acquires a resource from the semaphore, if the semaphore has no resources available then
* this will wait until a resource becomes available.
*/
[[nodiscard]] auto acquire(ptrdiff_t count) -> acquire_operation
{
return acquire_operation{*this, count};
}
/**
* Attemtps to acquire a resource if there is any resources available.
* @return True if the acquire operation was able to acquire a resource.
*/
auto try_acquire(ptrdiff_t count) -> bool;
/**
* @return The maximum number of resources the semaphore can contain.
*/
auto max() const noexcept -> std::ptrdiff_t
{
return m_least_max_value;
}
/**
* The current number of resources available in this semaphore.
*/
auto value() const noexcept -> std::ptrdiff_t
{
return m_counter.load(std::memory_order::relaxed);
}
/**
* Stops the semaphore and will notify all release/acquire waiters to wake up in a failed state.
* Once this is set it cannot be un-done and all future oprations on the semaphore will fail.
*/
auto notify_waiters() noexcept -> void;
private:
friend class release_operation;
friend class acquire_operation;
const std::ptrdiff_t m_least_max_value;
std::atomic<std::ptrdiff_t> m_counter;
std::mutex m_waiter_mutex{};
acquire_operation* m_acquire_waiters{nullptr};
std::atomic<bool> m_notify_all_set{false};
};
} // namespace coro
namespace test
{
using namespace std::string_literals;
std::string semaphore::acquire_result_acquired = "acquired"s;
std::string semaphore::acquire_result_semaphore_stopped = "semaphore_stopped"s;
std::string semaphore::acquire_result_unknown = "unknown"s;
semaphore::semaphore(std::ptrdiff_t least_max_value_and_starting_value) : semaphore(least_max_value_and_starting_value, least_max_value_and_starting_value) {}
semaphore::semaphore(std::ptrdiff_t least_max_value, std::ptrdiff_t starting_value)
: m_least_max_value(least_max_value), m_counter(starting_value <= least_max_value ? starting_value : least_max_value)
{
}
semaphore::~semaphore()
{
notify_waiters();
}
semaphore::acquire_operation::acquire_operation(semaphore& s, ptrdiff_t count) : m_semaphore(s), m_count(count) {}
auto semaphore::acquire_operation::await_ready() const noexcept -> bool
{
if (m_semaphore.m_notify_all_set.load(std::memory_order::relaxed))
{
return true;
}
return m_semaphore.try_acquire(m_count);
}
auto semaphore::acquire_operation::await_suspend(std::coroutine_handle<> awaiting_coroutine) noexcept -> bool
{
std::unique_lock lk{m_semaphore.m_waiter_mutex};
if (m_semaphore.m_notify_all_set.load(std::memory_order::relaxed))
{
return false;
}
if (m_semaphore.try_acquire(m_count))
{
return false;
}
if (m_semaphore.m_acquire_waiters == nullptr)
{
m_semaphore.m_acquire_waiters = this;
}
else
{
// This is LIFO, but semaphores are not meant to be fair.
// Set our next to the current head.
m_next = m_semaphore.m_acquire_waiters;
// Set the semaphore head to this.
m_semaphore.m_acquire_waiters = this;
}
m_awaiting_coroutine = awaiting_coroutine;
return true;
}
auto semaphore::acquire_operation::await_resume() const -> acquire_result
{
if (m_semaphore.m_notify_all_set.load(std::memory_order::relaxed))
{
return acquire_result::semaphore_stopped;
}
return acquire_result::acquired;
}
auto semaphore::release(ptrdiff_t count) -> void
{
// It seems like the atomic counter could be incremented, but then resuming a waiter could have
// a race between a new acquirer grabbing the just incremented resource value from us. So its
// best to check if there are any waiters first, and transfer owernship of the resource thats
// being released directly to the waiter to avoid this problem.
std::unique_lock lk{m_waiter_mutex};
if (m_acquire_waiters != nullptr)
{
acquire_operation* to_resume = m_acquire_waiters;
m_acquire_waiters = m_acquire_waiters->m_next;
lk.unlock();
// This will transfer ownership of the resource to the resumed waiter.
to_resume->m_awaiting_coroutine.resume();
}
else
{
// Normally would be release but within a lock use releaxed.
m_counter.fetch_add(count, std::memory_order::relaxed);
}
}
auto semaphore::try_acquire(ptrdiff_t count) -> bool
{
// Optimistically grab the resource.
auto previous = m_counter.fetch_sub(count, std::memory_order::acq_rel);
if (previous < count)
{
// If it wasn't available undo the acquisition.
m_counter.fetch_add(count, std::memory_order::release);
return false;
}
return true;
}
auto semaphore::notify_waiters() noexcept -> void
{
m_notify_all_set.exchange(true, std::memory_order::release);
while (true)
{
std::unique_lock lk{m_waiter_mutex};
if (m_acquire_waiters != nullptr)
{
acquire_operation* to_resume = m_acquire_waiters;
m_acquire_waiters = m_acquire_waiters->m_next;
lk.unlock();
to_resume->m_awaiting_coroutine.resume();
}
else
{
break;
}
}
}
} // namespace coro
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment