Skip to content

Instantly share code, notes, and snippets.

@mratsim
Last active November 19, 2023 13:52
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mratsim/04a29bdd98d6295acda4d0677c4d0041 to your computer and use it in GitHub Desktop.
Save mratsim/04a29bdd98d6295acda4d0677c4d0041 to your computer and use it in GitHub Desktop.
Eventcounts
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <atomic>
#include <climits>
#include <thread>
#include <glog/logging.h>
#include <folly/Likely.h>
#include <folly/detail/Futex.h>
#include <folly/lang/Bits.h>
#include <folly/portability/SysTime.h>
#include <folly/portability/Unistd.h>
namespace folly {
/**
* Event count: a condition variable for lock free algorithms.
*
* See http://www.1024cores.net/home/lock-free-algorithms/eventcounts for
* details.
*
* Event counts allow you to convert a non-blocking lock-free / wait-free
* algorithm into a blocking one, by isolating the blocking logic. You call
* prepareWait() before checking your condition and then either cancelWait()
* or wait() depending on whether the condition was true. When another
* thread makes the condition true, it must call notify() / notifyAll() just
* like a regular condition variable.
*
* If "<" denotes the happens-before relationship, consider 2 threads (T1 and
* T2) and 3 events:
* - E1: T1 returns from prepareWait
* - E2: T1 calls wait
* (obviously E1 < E2, intra-thread)
* - E3: T2 calls notifyAll
*
* If E1 < E3, then E2's wait will complete (and T1 will either wake up,
* or not block at all)
*
* This means that you can use an EventCount in the following manner:
*
* Waiter:
* if (!condition()) { // handle fast path first
* for (;;) {
* auto key = eventCount.prepareWait();
* if (condition()) {
* eventCount.cancelWait();
* break;
* } else {
* eventCount.wait(key);
* }
* }
* }
*
* (This pattern is encapsulated in await())
*
* Poster:
* make_condition_true();
* eventCount.notifyAll();
*
* Note that, just like with regular condition variables, the waiter needs to
* be tolerant of spurious wakeups and needs to recheck the condition after
* being woken up. Also, as there is no mutual exclusion implied, "checking"
* the condition likely means attempting an operation on an underlying
* data structure (push into a lock-free queue, etc) and returning true on
* success and false on failure.
*/
class EventCount {
public:
EventCount() noexcept : val_(0) {}
class Key {
friend class EventCount;
explicit Key(uint32_t e) noexcept : epoch_(e) {}
uint32_t epoch_;
};
void notify() noexcept;
void notifyAll() noexcept;
Key prepareWait() noexcept;
void cancelWait() noexcept;
void wait(Key key) noexcept;
/**
* Wait for condition() to become true. Will clean up appropriately if
* condition() throws, and then rethrow.
*/
template <class Condition>
void await(Condition condition);
private:
void doNotify(int n) noexcept;
EventCount(const EventCount&) = delete;
EventCount(EventCount&&) = delete;
EventCount& operator=(const EventCount&) = delete;
EventCount& operator=(EventCount&&) = delete;
// This requires 64-bit
static_assert(sizeof(int) == 4, "bad platform");
static_assert(sizeof(uint32_t) == 4, "bad platform");
static_assert(sizeof(uint64_t) == 8, "bad platform");
static_assert(sizeof(std::atomic<uint64_t>) == 8, "bad platform");
static_assert(sizeof(detail::Futex<std::atomic>) == 4, "bad platform");
static constexpr size_t kEpochOffset = kIsLittleEndian ? 1 : 0;
// val_ stores the epoch in the most significant 32 bits and the
// waiter count in the least significant 32 bits.
std::atomic<uint64_t> val_;
static constexpr uint64_t kAddWaiter = uint64_t(1);
static constexpr uint64_t kSubWaiter = uint64_t(-1);
static constexpr size_t kEpochShift = 32;
static constexpr uint64_t kAddEpoch = uint64_t(1) << kEpochShift;
static constexpr uint64_t kWaiterMask = kAddEpoch - 1;
};
inline void EventCount::notify() noexcept {
doNotify(1);
}
inline void EventCount::notifyAll() noexcept {
doNotify(INT_MAX);
}
inline void EventCount::doNotify(int n) noexcept {
uint64_t prev = val_.fetch_add(kAddEpoch, std::memory_order_acq_rel);
if (UNLIKELY(prev & kWaiterMask)) {
detail::futexWake(
reinterpret_cast<detail::Futex<std::atomic>*>(&val_) + kEpochOffset, n);
}
}
inline EventCount::Key EventCount::prepareWait() noexcept {
uint64_t prev = val_.fetch_add(kAddWaiter, std::memory_order_acq_rel);
return Key(prev >> kEpochShift);
}
inline void EventCount::cancelWait() noexcept {
// memory_order_relaxed would suffice for correctness, but the faster
// #waiters gets to 0, the less likely it is that we'll do spurious wakeups
// (and thus system calls).
uint64_t prev = val_.fetch_add(kSubWaiter, std::memory_order_seq_cst);
DCHECK_NE((prev & kWaiterMask), 0);
}
inline void EventCount::wait(Key key) noexcept {
while ((val_.load(std::memory_order_acquire) >> kEpochShift) == key.epoch_) {
detail::futexWait(
reinterpret_cast<detail::Futex<std::atomic>*>(&val_) + kEpochOffset,
key.epoch_);
}
// memory_order_relaxed would suffice for correctness, but the faster
// #waiters gets to 0, the less likely it is that we'll do spurious wakeups
// (and thus system calls)
uint64_t prev = val_.fetch_add(kSubWaiter, std::memory_order_seq_cst);
DCHECK_NE((prev & kWaiterMask), 0);
}
template <class Condition>
void EventCount::await(Condition condition) {
if (condition()) {
return; // fast path
}
// condition() is the only thing that may throw, everything else is
// noexcept, so we can hoist the try/catch block outside of the loop
try {
for (;;) {
auto key = prepareWait();
if (condition()) {
cancelWait();
break;
} else {
wait(key);
}
}
} catch (...) {
cancelWait();
throw;
}
}
} // namespace folly
// 2019/02/09 - created by Tsung-Wei Huang
// - modified the event count from Eigen
#pragma once
#include <iostream>
#include <vector>
#include <cstdlib>
#include <cstdio>
#include <atomic>
#include <memory>
#include <deque>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <algorithm>
#include <numeric>
#include <cassert>
// This file is part of Eigen, a lightweight C++ template library
// for linear algebra.
//
// Copyright (C) 2016 Dmitry Vyukov <dvyukov@google.com>
//
// This Source Code Form is subject to the terms of the Mozilla
// Public License v. 2.0. If a copy of the MPL was not distributed
// with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
namespace tf {
// Notifier allows to wait for arbitrary predicates in non-blocking
// algorithms. Think of condition variable, but wait predicate does not need to
// be protected by a mutex. Usage:
// Waiting thread does:
//
// if (predicate)
// return act();
// Notifier::Waiter& w = waiters[my_index];
// ec.prepare_wait(&w);
// if (predicate) {
// ec.cancel_wait(&w);
// return act();
// }
// ec.commit_wait(&w);
//
// Notifying thread does:
//
// predicate = true;
// ec.notify(true);
//
// notify is cheap if there are no waiting threads. prepare_wait/commit_wait are not
// cheap, but they are executed only if the preceeding predicate check has
// failed.
//
// Algorihtm outline:
// There are two main variables: predicate (managed by user) and _state.
// Operation closely resembles Dekker mutual algorithm:
// https://en.wikipedia.org/wiki/Dekker%27s_algorithm
// Waiting thread sets _state then checks predicate, Notifying thread sets
// predicate then checks _state. Due to seq_cst fences in between these
// operations it is guaranteed than either waiter will see predicate change
// and won't block, or notifying thread will see _state change and will unblock
// the waiter, or both. But it can't happen that both threads don't see each
// other changes, which would lead to deadlock.
class Notifier {
friend class Executor;
public:
struct Waiter {
std::atomic<Waiter*> next;
std::mutex mu;
std::condition_variable cv;
uint64_t epoch;
unsigned state;
enum {
kNotSignaled,
kWaiting,
kSignaled,
};
};
explicit Notifier(size_t N) : _waiters{N} {
assert(_waiters.size() < (1 << kWaiterBits) - 1);
// Initialize epoch to something close to overflow to test overflow.
_state = kStackMask | (kEpochMask - kEpochInc * _waiters.size() * 2);
}
~Notifier() {
// Ensure there are no waiters.
assert((_state.load() & (kStackMask | kWaiterMask)) == kStackMask);
}
// prepare_wait prepares for waiting.
// After calling this function the thread must re-check the wait predicate
// and call either cancel_wait or commit_wait passing the same Waiter object.
void prepare_wait(Waiter* w) {
w->epoch = _state.fetch_add(kWaiterInc, std::memory_order_relaxed);
std::atomic_thread_fence(std::memory_order_seq_cst);
}
// commit_wait commits waiting.
void commit_wait(Waiter* w) {
w->state = Waiter::kNotSignaled;
// Modification epoch of this waiter.
uint64_t epoch =
(w->epoch & kEpochMask) +
(((w->epoch & kWaiterMask) >> kWaiterShift) << kEpochShift);
uint64_t state = _state.load(std::memory_order_seq_cst);
for (;;) {
if (int64_t((state & kEpochMask) - epoch) < 0) {
// The preceeding waiter has not decided on its fate. Wait until it
// calls either cancel_wait or commit_wait, or is notified.
std::this_thread::yield();
state = _state.load(std::memory_order_seq_cst);
continue;
}
// We've already been notified.
if (int64_t((state & kEpochMask) - epoch) > 0) return;
// Remove this thread from prewait counter and add it to the waiter list.
assert((state & kWaiterMask) != 0);
uint64_t newstate = state - kWaiterInc + kEpochInc;
//newstate = (newstate & ~kStackMask) | (w - &_waiters[0]);
newstate = static_cast<uint64_t>((newstate & ~kStackMask) | static_cast<uint64_t>(w - &_waiters[0]));
if ((state & kStackMask) == kStackMask)
w->next.store(nullptr, std::memory_order_relaxed);
else
w->next.store(&_waiters[state & kStackMask], std::memory_order_relaxed);
if (_state.compare_exchange_weak(state, newstate,
std::memory_order_release))
break;
}
_park(w);
}
// cancel_wait cancels effects of the previous prepare_wait call.
void cancel_wait(Waiter* w) {
uint64_t epoch =
(w->epoch & kEpochMask) +
(((w->epoch & kWaiterMask) >> kWaiterShift) << kEpochShift);
uint64_t state = _state.load(std::memory_order_relaxed);
for (;;) {
if (int64_t((state & kEpochMask) - epoch) < 0) {
// The preceeding waiter has not decided on its fate. Wait until it
// calls either cancel_wait or commit_wait, or is notified.
std::this_thread::yield();
state = _state.load(std::memory_order_relaxed);
continue;
}
// We've already been notified.
if (int64_t((state & kEpochMask) - epoch) > 0) return;
// Remove this thread from prewait counter.
assert((state & kWaiterMask) != 0);
if (_state.compare_exchange_weak(state, state - kWaiterInc + kEpochInc,
std::memory_order_relaxed))
return;
}
}
// notify wakes one or all waiting threads.
// Must be called after changing the associated wait predicate.
void notify(bool all) {
std::atomic_thread_fence(std::memory_order_seq_cst);
uint64_t state = _state.load(std::memory_order_acquire);
for (;;) {
// Easy case: no waiters.
if ((state & kStackMask) == kStackMask && (state & kWaiterMask) == 0)
return;
uint64_t waiters = (state & kWaiterMask) >> kWaiterShift;
uint64_t newstate;
if (all) {
// Reset prewait counter and empty wait list.
newstate = (state & kEpochMask) + (kEpochInc * waiters) + kStackMask;
} else if (waiters) {
// There is a thread in pre-wait state, unblock it.
newstate = state + kEpochInc - kWaiterInc;
} else {
// Pop a waiter from list and unpark it.
Waiter* w = &_waiters[state & kStackMask];
Waiter* wnext = w->next.load(std::memory_order_relaxed);
uint64_t next = kStackMask;
//if (wnext != nullptr) next = wnext - &_waiters[0];
if (wnext != nullptr) next = static_cast<uint64_t>(wnext - &_waiters[0]);
// Note: we don't add kEpochInc here. ABA problem on the lock-free stack
// can't happen because a waiter is re-pushed onto the stack only after
// it was in the pre-wait state which inevitably leads to epoch
// increment.
newstate = (state & kEpochMask) + next;
}
if (_state.compare_exchange_weak(state, newstate,
std::memory_order_acquire)) {
if (!all && waiters) return; // unblocked pre-wait thread
if ((state & kStackMask) == kStackMask) return;
Waiter* w = &_waiters[state & kStackMask];
if (!all) w->next.store(nullptr, std::memory_order_relaxed);
_unpark(w);
return;
}
}
}
// notify n workers
void notify_n(size_t n) {
if(n >= _waiters.size()) {
notify(true);
}
else {
for(size_t k=0; k<n; ++k) {
notify(false);
}
}
}
size_t size() const {
return _waiters.size();
}
private:
// State_ layout:
// - low kStackBits is a stack of waiters committed wait.
// - next kWaiterBits is count of waiters in prewait state.
// - next kEpochBits is modification counter.
static const uint64_t kStackBits = 16;
static const uint64_t kStackMask = (1ull << kStackBits) - 1;
static const uint64_t kWaiterBits = 16;
static const uint64_t kWaiterShift = 16;
static const uint64_t kWaiterMask = ((1ull << kWaiterBits) - 1)
<< kWaiterShift;
static const uint64_t kWaiterInc = 1ull << kWaiterBits;
static const uint64_t kEpochBits = 32;
static const uint64_t kEpochShift = 32;
static const uint64_t kEpochMask = ((1ull << kEpochBits) - 1) << kEpochShift;
static const uint64_t kEpochInc = 1ull << kEpochShift;
std::atomic<uint64_t> _state;
std::vector<Waiter> _waiters;
void _park(Waiter* w) {
std::unique_lock<std::mutex> lock(w->mu);
while (w->state != Waiter::kSignaled) {
w->state = Waiter::kWaiting;
w->cv.wait(lock);
}
}
void _unpark(Waiter* waiters) {
Waiter* next = nullptr;
for (Waiter* w = waiters; w; w = next) {
next = w->next.load(std::memory_order_relaxed);
unsigned state;
{
std::unique_lock<std::mutex> lock(w->mu);
state = w->state;
w->state = Waiter::kSignaled;
}
// Avoid notifying if it wasn't waiting.
if (state == Waiter::kWaiting) w->cv.notify_one();
}
}
};
} // namespace tf ------------------------------------------------------------
// This file is part of Eigen, a lightweight C++ template library
// for linear algebra.
//
// Copyright (C) 2016 Dmitry Vyukov <dvyukov@google.com>
//
// This Source Code Form is subject to the terms of the Mozilla
// Public License v. 2.0. If a copy of the MPL was not distributed
// with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
#ifndef EIGEN_CXX11_THREADPOOL_EVENTCOUNT_H_
#define EIGEN_CXX11_THREADPOOL_EVENTCOUNT_H_
namespace Eigen {
// EventCount allows to wait for arbitrary predicates in non-blocking
// algorithms. Think of condition variable, but wait predicate does not need to
// be protected by a mutex. Usage:
// Waiting thread does:
//
// if (predicate)
// return act();
// EventCount::Waiter& w = waiters[my_index];
// ec.Prewait(&w);
// if (predicate) {
// ec.CancelWait(&w);
// return act();
// }
// ec.CommitWait(&w);
//
// Notifying thread does:
//
// predicate = true;
// ec.Notify(true);
//
// Notify is cheap if there are no waiting threads. Prewait/CommitWait are not
// cheap, but they are executed only if the preceding predicate check has
// failed.
//
// Algorithm outline:
// There are two main variables: predicate (managed by user) and state_.
// Operation closely resembles Dekker mutual algorithm:
// https://en.wikipedia.org/wiki/Dekker%27s_algorithm
// Waiting thread sets state_ then checks predicate, Notifying thread sets
// predicate then checks state_. Due to seq_cst fences in between these
// operations it is guaranteed than either waiter will see predicate change
// and won't block, or notifying thread will see state_ change and will unblock
// the waiter, or both. But it can't happen that both threads don't see each
// other changes, which would lead to deadlock.
class EventCount {
public:
class Waiter;
EventCount(MaxSizeVector<Waiter>& waiters)
: state_(kStackMask), waiters_(waiters) {
eigen_plain_assert(waiters.size() < (1 << kWaiterBits) - 1);
}
~EventCount() {
// Ensure there are no waiters.
eigen_plain_assert(state_.load() == kStackMask);
}
// Prewait prepares for waiting.
// After calling Prewait, the thread must re-check the wait predicate
// and then call either CancelWait or CommitWait.
void Prewait() {
uint64_t state = state_.load(std::memory_order_relaxed);
for (;;) {
CheckState(state);
uint64_t newstate = state + kWaiterInc;
CheckState(newstate);
if (state_.compare_exchange_weak(state, newstate,
std::memory_order_seq_cst))
return;
}
}
// CommitWait commits waiting after Prewait.
void CommitWait(Waiter* w) {
eigen_plain_assert((w->epoch & ~kEpochMask) == 0);
w->state = Waiter::kNotSignaled;
const uint64_t me = (w - &waiters_[0]) | w->epoch;
uint64_t state = state_.load(std::memory_order_seq_cst);
for (;;) {
CheckState(state, true);
uint64_t newstate;
if ((state & kSignalMask) != 0) {
// Consume the signal and return immidiately.
newstate = state - kWaiterInc - kSignalInc;
} else {
// Remove this thread from pre-wait counter and add to the waiter stack.
newstate = ((state & kWaiterMask) - kWaiterInc) | me;
w->next.store(state & (kStackMask | kEpochMask),
std::memory_order_relaxed);
}
CheckState(newstate);
if (state_.compare_exchange_weak(state, newstate,
std::memory_order_acq_rel)) {
if ((state & kSignalMask) == 0) {
w->epoch += kEpochInc;
Park(w);
}
return;
}
}
}
// CancelWait cancels effects of the previous Prewait call.
void CancelWait() {
uint64_t state = state_.load(std::memory_order_relaxed);
for (;;) {
CheckState(state, true);
uint64_t newstate = state - kWaiterInc;
// We don't know if the thread was also notified or not,
// so we should not consume a signal unconditionaly.
// Only if number of waiters is equal to number of signals,
// we know that the thread was notified and we must take away the signal.
if (((state & kWaiterMask) >> kWaiterShift) ==
((state & kSignalMask) >> kSignalShift))
newstate -= kSignalInc;
CheckState(newstate);
if (state_.compare_exchange_weak(state, newstate,
std::memory_order_acq_rel))
return;
}
}
// Notify wakes one or all waiting threads.
// Must be called after changing the associated wait predicate.
void Notify(bool notifyAll) {
std::atomic_thread_fence(std::memory_order_seq_cst);
uint64_t state = state_.load(std::memory_order_acquire);
for (;;) {
CheckState(state);
const uint64_t waiters = (state & kWaiterMask) >> kWaiterShift;
const uint64_t signals = (state & kSignalMask) >> kSignalShift;
// Easy case: no waiters.
if ((state & kStackMask) == kStackMask && waiters == signals) return;
uint64_t newstate;
if (notifyAll) {
// Empty wait stack and set signal to number of pre-wait threads.
newstate =
(state & kWaiterMask) | (waiters << kSignalShift) | kStackMask;
} else if (signals < waiters) {
// There is a thread in pre-wait state, unblock it.
newstate = state + kSignalInc;
} else {
// Pop a waiter from list and unpark it.
Waiter* w = &waiters_[state & kStackMask];
uint64_t next = w->next.load(std::memory_order_relaxed);
newstate = (state & (kWaiterMask | kSignalMask)) | next;
}
CheckState(newstate);
if (state_.compare_exchange_weak(state, newstate,
std::memory_order_acq_rel)) {
if (!notifyAll && (signals < waiters))
return; // unblocked pre-wait thread
if ((state & kStackMask) == kStackMask) return;
Waiter* w = &waiters_[state & kStackMask];
if (!notifyAll) w->next.store(kStackMask, std::memory_order_relaxed);
Unpark(w);
return;
}
}
}
class Waiter {
friend class EventCount;
// Align to 128 byte boundary to prevent false sharing with other Waiter
// objects in the same vector.
EIGEN_ALIGN_TO_BOUNDARY(128) std::atomic<uint64_t> next;
std::mutex mu;
std::condition_variable cv;
uint64_t epoch = 0;
unsigned state = kNotSignaled;
enum {
kNotSignaled,
kWaiting,
kSignaled,
};
};
private:
// State_ layout:
// - low kWaiterBits is a stack of waiters committed wait
// (indexes in waiters_ array are used as stack elements,
// kStackMask means empty stack).
// - next kWaiterBits is count of waiters in prewait state.
// - next kWaiterBits is count of pending signals.
// - remaining bits are ABA counter for the stack.
// (stored in Waiter node and incremented on push).
static const uint64_t kWaiterBits = 14;
static const uint64_t kStackMask = (1ull << kWaiterBits) - 1;
static const uint64_t kWaiterShift = kWaiterBits;
static const uint64_t kWaiterMask = ((1ull << kWaiterBits) - 1)
<< kWaiterShift;
static const uint64_t kWaiterInc = 1ull << kWaiterShift;
static const uint64_t kSignalShift = 2 * kWaiterBits;
static const uint64_t kSignalMask = ((1ull << kWaiterBits) - 1)
<< kSignalShift;
static const uint64_t kSignalInc = 1ull << kSignalShift;
static const uint64_t kEpochShift = 3 * kWaiterBits;
static const uint64_t kEpochBits = 64 - kEpochShift;
static const uint64_t kEpochMask = ((1ull << kEpochBits) - 1) << kEpochShift;
static const uint64_t kEpochInc = 1ull << kEpochShift;
std::atomic<uint64_t> state_;
MaxSizeVector<Waiter>& waiters_;
static void CheckState(uint64_t state, bool waiter = false) {
static_assert(kEpochBits >= 20, "not enough bits to prevent ABA problem");
const uint64_t waiters = (state & kWaiterMask) >> kWaiterShift;
const uint64_t signals = (state & kSignalMask) >> kSignalShift;
eigen_plain_assert(waiters >= signals);
eigen_plain_assert(waiters < (1 << kWaiterBits) - 1);
eigen_plain_assert(!waiter || waiters > 0);
(void)waiters;
(void)signals;
}
void Park(Waiter* w) {
std::unique_lock<std::mutex> lock(w->mu);
while (w->state != Waiter::kSignaled) {
w->state = Waiter::kWaiting;
w->cv.wait(lock);
}
}
void Unpark(Waiter* w) {
for (Waiter* next; w; w = next) {
uint64_t wnext = w->next.load(std::memory_order_relaxed) & kStackMask;
next = wnext == kStackMask ? nullptr : &waiters_[wnext];
unsigned state;
{
std::unique_lock<std::mutex> lock(w->mu);
state = w->state;
w->state = Waiter::kSignaled;
}
// Avoid notifying if it wasn't waiting.
if (state == Waiter::kWaiting) w->cv.notify_one();
}
}
EventCount(const EventCount&) = delete;
void operator=(const EventCount&) = delete;
};
} // namespace Eigen
#endif // EIGEN_CXX11_THREADPOOL_EVENTCOUNT_H_
// https://pastebin.com/f1e159de5
#include "stdafx.h"
#include "../../relacy/relacy_std.hpp"
#include <cstdio>
#include <list>
namespace raii {
template<typename T>
class unguard {
T& m_guard;
public:
unguard(T& guard)
: m_guard(guard) {
m_guard.unlock();
}
~unguard() {
m_guard.lock();
}
};
template<typename T, typename Y>
class guard {
T& m_mtx;
Y& m_tid;
friend class unguard<guard>;
public:
guard(T& mtx, Y& tid)
: m_mtx(mtx), m_tid(tid) {
lock();
}
~guard() {
unlock();
}
Y& get_tid() const {
return m_tid;
}
private:
void lock() {
m_mtx.lock(m_tid);
}
void unlock() {
m_mtx.unlock(m_tid);
}
};
}
namespace pthread {
namespace sys {
static void spin_wait() {
rl::backoff wait;
wait.yield($);
}
class spinlock {
std::atomic<class thread*> m_state;
public:
void before() {
m_state($) = NULL;
}
void after() {
RL_ASSERT(! m_state($).load(std::memory_order_relaxed));
}
public:
typedef raii::guard<spinlock, class thread> guard;
typedef raii::unguard<guard> unguard;
public:
void lock(thread& tid) {
class thread* cmp = NULL;
while (! m_state($).compare_swap(cmp, &tid, std::memory_order_acquire)) {
spin_wait();
cmp = NULL;
}
}
void unlock(thread& tid) {
class thread* cmp = m_state($).swap(NULL, std::memory_order_release);
if (cmp != &tid) {
RL_ASSERT(cmp == &tid);
}
}
};
class thread {
unsigned const m_idx;
std::atomic<bool> m_signal;
public:
thread(unsigned const idx)
: m_idx(idx) {
m_signal($) = false;
}
void wait() {
// std::cout << "thread: " << this << " waiting\n";
while (! m_signal($).swap(false, std::memory_order_acquire)) {
spin_wait();
}
// std::cout << "thread: " << this << " signal consume\n";
}
void signal() {
m_signal($).store(true, std::memory_order_release);
// std::cout << "thread: " << this << " signalled\n";
}
};
class waitset {
spinlock m_mtx;
rl::var<unsigned> m_count;
std::list<thread*> m_threads;
public:
void before() {
m_count($) = 0;
m_mtx.before();
}
void after() {
RL_ASSERT(! m_threads.size());
m_mtx.after();
}
public:
void push(thread& tid) {
spinlock::guard lock(m_mtx, tid);
m_threads.push_back(&tid);
}
void pop(thread& tid) {
spinlock::guard lock(m_mtx, tid);
m_threads.remove(&tid);
}
void wait(thread& tid) {
spinlock::guard lock(m_mtx, tid);
if (! m_count($)) {
m_threads.push_back(&tid);
{
spinlock::unguard unlock(lock);
tid.wait();
}
m_threads.remove(&tid);
return;
}
--m_count($);
}
void signal(thread& tid, unsigned count = 0) {
spinlock::guard lock(m_mtx, tid);
m_count($) += count;
if (! m_threads.empty()) {
m_threads.front()->signal();
}
}
void broadcast(thread& tid, unsigned count = 0) {
spinlock::guard lock(m_mtx, tid);
m_count($) += count;
std::list<thread*>::iterator i;
for (i = m_threads.begin(); i != m_threads.end(); ++i) {
(*i)->signal();
}
}
};
} // namespace sys
class event {
std::atomic<bool> m_state;
sys::waitset m_wset;
public:
void before() {
m_state($) = false;
m_wset.before();
}
void after() {
m_wset.after();
}
public:
void set(sys::thread& tid) {
if (! m_state($).swap(true, std::memory_order_seq_cst)) {
m_wset.signal(tid, 1);
}
}
void wait(sys::thread& tid) {
bool cmp = true;
while (! m_state($).compare_swap(cmp, false, std::memory_order_acquire)) {
m_wset.wait(tid);
cmp = true;
}
}
};
class mutex {
enum constant {
UNLOCKED = 0,
LOCKED = 1,
CONTENTION = 2
};
std::atomic<int> m_state;
event m_event;
public:
void before() {
m_state($) = UNLOCKED;
m_event.before();
}
void after() {
m_event.after();
RL_ASSERT(m_state($).load(std::memory_order_relaxed) == UNLOCKED);
}
public:
typedef raii::guard<mutex, sys::thread> guard;
typedef raii::unguard<guard> unguard;
public:
void lock(sys::thread& tid) {
if (m_state($).swap(LOCKED, std::memory_order_acquire)) {
while (m_state($).swap(CONTENTION, std::memory_order_acquire)) {
m_event.wait(tid);
}
}
}
void unlock(sys::thread& tid) {
if (m_state($).swap(UNLOCKED, std::memory_order_release) == CONTENTION) {
m_event.set(tid);
}
}
};
class condvar {
sys::waitset m_wset;
std::atomic<unsigned> m_state;
public:
void before() {
m_state($) = 0;
m_wset.before();
}
void after() {
RL_ASSERT(! m_state($).load(std::memory_order_relaxed));
m_wset.after();
}
public:
void wait(mutex::guard& lock) {
sys::thread& tid = lock.get_tid();
m_state($).fetch_add(1, std::memory_order_relaxed);
m_wset.push(tid);
mutex::unguard unlock(lock);
tid.wait();
m_wset.pop(tid);
}
void signal(sys::thread& tid) {
unsigned cmp = m_state($).load(std::memory_order_relaxed);
do {
if (! cmp) { return; }
} while(! m_state($).compare_swap(cmp, cmp - 1, std::memory_order_relaxed));
m_wset.signal(tid);
}
void broadcast(sys::thread& tid) {
unsigned cmp = m_state($).load(std::memory_order_relaxed);
if (cmp) {
if (m_state($).swap(0, std::memory_order_relaxed)) {
m_wset.broadcast(tid);
}
}
}
};
} // namespace pthread
#define CONSUMER_COUNT 2
struct condvar_test : rl::test_suite<condvar_test, CONSUMER_COUNT + 1> {
pthread::mutex m_mtx;
pthread::condvar m_cond;
rl::var<bool> m_signal;
rl::var<bool> m_finish;
void before() {
m_mtx.before();
m_cond.before();
m_signal($) = false;
m_finish($) = false;
}
void after() {
RL_ASSERT(! m_signal($));
RL_ASSERT(! m_finish($));
m_cond.after();
m_mtx.after();
}
void producer(pthread::sys::thread& tid) {
unsigned count = 0;
// std::cout << "producer " << &tid << "\n";
do {
{
pthread::mutex::guard lock(m_mtx, tid);
m_signal($) = true;
}
m_cond.signal(tid);
pthread::mutex::guard lock(m_mtx, tid);
while (! m_finish($)) {
m_cond.wait(lock);
}
m_finish($) = false;
RL_ASSERT(! m_signal($));
++count;
} while (count < CONSUMER_COUNT);
}
void consumers(pthread::sys::thread& tid) {
// std::cout << "consumer " << &tid << "\n";
{
pthread::mutex::guard lock(m_mtx, tid);
while (! m_signal($)) {
m_cond.wait(lock);
}
m_signal($) = false;
m_finish($) = true;
}
m_cond.broadcast(tid);
}
void thread(unsigned tidx) {
pthread::sys::thread tid(tidx);
if (! tidx) { // producer
producer(tid);
} else { // consumers
consumers(tid);
}
}
};
int main() {
{
rl::test_params params;
params.search_type = rl::random_scheduler_type;
params.iteration_count = 1000;
rl::simulate<condvar_test>(params);
std::cout << rl::test_result_str(params.test_result) << std::endl;
}
std::puts("\n\n\nFINISHED!\n___________________________________\n\
press <ENTER> to exit...");
std::getchar();
return 0;
}
// https://pastebin.com/f4ec057ef
#include <relacy/relacy_std.hpp>
#include <cstdio>
#include <cstddef>
#if ! defined (NDEBUG)
# define DBG_PRINTF(mp_exp) std::printf mp_exp
#else
# define DBG_PRINTF(mp_exp) ((void)0)
#endif
class eventcount {
public:
typedef unsigned long key_type;
private:
mutable rl::atomic<key_type> m_count;
rl::mutex m_mutex;
rl::condition_variable m_cond;
void prv_signal(key_type key) {
if (key & 1) {
m_mutex.lock($);
while (! m_count($).compare_exchange_weak(key, (key + 2) & ~1,
rl::memory_order_seq_cst));
m_mutex.unlock($);
m_cond.notify_all($);
}
}
public:
eventcount() {
m_count($).store(0, rl::memory_order_relaxed);
}
public:
key_type get() const {
return m_count($).fetch_or(1, rl::memory_order_acquire);
}
void signal() {
prv_signal(m_count($).fetch_add(0, rl::memory_order_seq_cst));
}
void signal_relaxed() {
prv_signal(m_count($).load(rl::memory_order_relaxed));
}
void wait(key_type cmp) {
m_mutex.lock($);
if ((m_count($).load(rl::memory_order_seq_cst) & ~1) == (cmp & ~1)) {
m_cond.wait(m_mutex, $);
}
m_mutex.unlock($);
}
};
template<typename T, std::size_t T_size>
class mpsc_boundq {
rl::atomic<std::size_t> m_push;
rl::atomic<std::size_t> m_push_idx;
rl::atomic<bool> m_commit[T_size];
rl::var<std::size_t> m_pop_idx;
rl::var<T> m_buffer[T_size];
eventcount m_pop_ecount;
eventcount m_push_ecount;
private:
bool prv_try_push_strong() {
std::size_t push_ = m_push($).load(rl::memory_order_relaxed);
while (! m_push($).compare_exchange_weak(push_,
(push_) ? push_ - 1 : 0, rl::memory_order_seq_cst));
return (push_) ? true : false;
}
public:
mpsc_boundq() {
m_push($).store(T_size, rl::memory_order_relaxed);
m_push_idx($).store(T_size, rl::memory_order_relaxed);
m_pop_idx($) = 0;
for (std::size_t i = 0; i < T_size; ++i) {
m_buffer[i]($) = T();
m_commit[i]($).store(false, rl::memory_order_relaxed);
}
}
~mpsc_boundq() {
RL_ASSERT(m_push($).load(rl::memory_order_relaxed) == T_size);
for (std::size_t i = 0; i < T_size; ++i) {
RL_ASSERT(! m_commit[i]($).load(rl::memory_order_relaxed));
}
}
public:
void push(T const& state) {
while (! prv_try_push_strong()) {
eventcount::key_type const eckey = m_push_ecount.get();
if (prv_try_push_strong()) break;
m_push_ecount.wait(eckey);
}
std::size_t const i = m_push_idx($).fetch_add(1,
rl::memory_order_seq_cst) % T_size;
m_buffer[i]($) = state;
m_commit[i]($).store(true, rl::memory_order_seq_cst);
/* if following is: `m_pop_ecount.signal_relaxed();' == BOOM!!!!
;^D RELACY!!!
______________________________________________________________________*/
m_pop_ecount.signal();
// m_pop_ecount.signal_relaxed();
}
void pop(T& state) {
std::size_t const i = m_pop_idx($) % T_size;
++m_pop_idx($);
while (! m_commit[i]($).load(rl::memory_order_seq_cst)) {
eventcount::key_type const eckey = m_pop_ecount.get();
if (m_commit[i]($).load(rl::memory_order_seq_cst)) break;
m_pop_ecount.wait(eckey);
}
state = m_buffer[i]($);
m_commit[i]($).store(false, rl::memory_order_seq_cst);
m_push($).fetch_add(1, rl::memory_order_seq_cst);
m_push_ecount.signal_relaxed();
}
};
#define CONSUMERS 1
#define PRODUCERS 4
#define THREADS (CONSUMERS + PRODUCERS)
#define ITERS 6
#define BUFFER (PRODUCERS * ITERS)
struct buffer_test : rl::test_suite<buffer_test, THREADS> {
mpsc_boundq<unsigned, BUFFER / 2> m_buffer;
void thread(unsigned tidx) {
if (tidx < CONSUMERS) {
for (unsigned i = 0; i < BUFFER; ++i) {
m_buffer.pop(tidx);
DBG_PRINTF(("consumer loop: %u\n", tidx));
}
} else {
for (unsigned i = 0; i < ITERS * CONSUMERS; ++i) {
m_buffer.push(i);
DBG_PRINTF(("producer(%u) loop: %u\n", tidx, i));
}
}
}
};
int main() {
rl::test_params params;
params.iteration_count = 9999999;
rl::simulate<buffer_test>(params);
std::puts("\n\n\n_____________________________\nCompleted!");
std::getchar();
return 0;
}
/// https://pastebin.com/f72cc3cc1
namespace syncronization {
class ecount {
private long m_count = 0;
private long m_waiters = 0;
private static long InterlockedOr(ref long dest, long value) {
long cmp, cmptmp = dest;
do {
cmp = cmptmp;
cmptmp = System.Threading.Interlocked.CompareExchange(ref dest, cmp | value, cmp);
} while (cmp != cmptmp);
return cmp;
}
private void prv_signal(long cmptmp, bool broadcast) {
if ((cmptmp & 1) != 0) {
lock (this) {
long cmp;
do {
cmp = cmptmp;
long newval = cmp + 2;
if (m_waiters < 2 || broadcast) {
newval &= ~1;
}
cmptmp = System.Threading.Interlocked.CompareExchange(ref m_count, newval, cmp);
} while (cmp != cmptmp);
if (m_waiters > 0) {
if (m_waiters == 1 || ! broadcast) {
--m_waiters;
System.Threading.Monitor.Pulse(this);
} else {
m_waiters = 0;
System.Threading.Monitor.PulseAll(this);
}
}
}
}
}
public long get() {
return InterlockedOr(ref m_count, 1);
}
public void signal() {
long cmp = System.Threading.Thread.VolatileRead(ref m_count);
System.Threading.Thread.MemoryBarrier();
prv_signal(cmp, false);
}
public void signal_relaxed() {
long cmp = System.Threading.Thread.VolatileRead(ref m_count);
prv_signal(cmp, false);
}
public void broadcast() {
long cmp = System.Threading.Thread.VolatileRead(ref m_count);
System.Threading.Thread.MemoryBarrier();
prv_signal(cmp, true);
}
public void broadcast_relaxed() {
long cmp = System.Threading.Thread.VolatileRead(ref m_count);
prv_signal(cmp, true);
}
public void wait(long cmp1) {
lock (this) {
long cmp2 = System.Threading.Thread.VolatileRead(ref m_count) & ~1;
if ((cmp1 & ~1) == cmp2) {
++m_waiters;
System.Threading.Monitor.Wait(this);
}
}
}
};
class mpmcq<T> {
public class node {
public volatile node m_next = null;
public T m_state = default(T);
public node() {}
public node(T state) {
m_state = state;
}
};
volatile node m_head;
volatile node m_tail;
public mpmcq(node n) {
n.m_next = null;
m_head = m_tail = n;
}
public mpmcq() : this(new node()) {}
public void push(T state) {
node n = new node(state);
n.m_next = null;
node prev = System.Threading.Interlocked.Exchange(ref m_head, n);
prev.m_next = n;
}
public bool pop(out T state) {
node cmp, cmptmp = m_tail;
do {
cmp = cmptmp;
node next = cmp.m_next;
if (next == null) {
state = default(T);
return false;
}
state = next.m_state;
cmptmp = System.Threading.Interlocked.CompareExchange(ref m_tail, next, cmp);
} while (cmp != cmptmp);
return true;
}
};
class producer {
ecount m_ecount;
mpmcq<string> m_queue;
int m_max;
int m_id;
public producer(int id, ecount ecount, mpmcq<string> queue, int max) {
m_ecount = ecount;
m_queue = queue;
m_max = max;
m_id = id;
}
public void run() {
System.Console.WriteLine("Producer " + m_id + ", Running!");
for (int i = 0; i < m_max; ++i) {
m_queue.push("Producer " + m_id + ", Object " + i);
m_ecount.signal();
System.Threading.Thread.Sleep(1);
}
System.Console.WriteLine("Producer " + m_id + ", Finished!");
}
};
class consumer {
ecount m_ecount;
mpmcq<string> m_queue;
int m_id;
public static long g_mcount = 0;
public static long g_ccount = 0;
public consumer(int id, ecount ecount, mpmcq<string> queue) {
m_ecount = ecount;
m_queue = queue;
m_id = id;
}
public void run() {
System.Console.WriteLine("Consumer " + m_id + ", Running!");
string s;
do {
while (! m_queue.pop(out s)) {
long key = m_ecount.get();
if (m_queue.pop(out s)) break;
m_ecount.wait(key);
}
System.Console.WriteLine("Consumer " + m_id + ", Received Message: " + s);
} while (System.Threading.Interlocked.Decrement(ref g_mcount) > 0);
for (int i = 1; i < g_ccount; ++i) {
m_queue.push("Consumer " + m_id + ", SHUTDOWN " + i);
}
m_ecount.broadcast();
System.Console.WriteLine("Consumer " + m_id + ", Finished!");
}
};
class application {
ecount m_ecount = new ecount();
mpmcq<string> m_queue = new mpmcq<string>();
int m_pcount, m_ccount, m_mcount;
producer[] m_producers;
consumer[] m_consumers;
System.Threading.Thread[] m_threads;
void prv_get_params() {
get_producer_count:
try {
System.Console.Write("Number of producers: ");
m_pcount = System.Int32.Parse(System.Console.ReadLine());
if (m_pcount < 1) {
System.Console.WriteLine("Sorry, you need at least one producer!");
System.Media.SystemSounds.Exclamation.Play();
goto get_producer_count;
}
get_consumer_count:
System.Console.Write("Number of consumers: ");
m_ccount = System.Int32.Parse(System.Console.ReadLine());
if (m_ccount < 1) {
System.Console.WriteLine("Sorry, you need at least one consumer!");
System.Media.SystemSounds.Exclamation.Play();
goto get_consumer_count;
}
get_message_count:
System.Console.Write("Number of messages per-producer: ");
m_mcount = System.Int32.Parse(System.Console.ReadLine());
if (m_mcount < 1) {
System.Console.WriteLine("Sorry, you need at least one message per-producer!");
System.Media.SystemSounds.Exclamation.Play();
goto get_message_count;
}
} catch {
System.Console.WriteLine("Bad input; try again!");
System.Media.SystemSounds.Exclamation.Play();
goto get_producer_count;
}
consumer.g_mcount = m_pcount * m_mcount;
consumer.g_ccount = m_ccount;
System.Console.Clear();
}
void prv_create_threads() {
m_producers = new producer[m_pcount];
m_consumers = new consumer[m_ccount];
m_threads = new System.Threading.Thread[m_pcount + m_ccount];
for (int i = 0; i < m_pcount; ++i) {
m_producers[i] = new producer(i, m_ecount, m_queue, m_mcount);
}
for (int i = 0; i < m_ccount; ++i) {
m_consumers[i] = new consumer(i, m_ecount, m_queue);
}
for (int i = 0; i < m_ccount; ++i) {
m_threads[i] = new System.Threading.Thread(m_consumers[i].run);
}
for (int i = 0; i < m_pcount; ++i) {
m_threads[i + m_ccount] = new System.Threading.Thread(m_producers[i].run);
}
}
void prv_start_threads() {
for (int i = 0; i < m_pcount + m_ccount; ++i) m_threads[i].Start();
}
void prv_join_threads() {
for (int i = 0; i < m_pcount + m_ccount; ++i) m_threads[i].Join();
}
public application() {
prv_get_params();
prv_create_threads();
}
public void go() {
prv_start_threads();
prv_join_threads();
}
static void Main(string[] args) {
application app = new application();
app.go();
System.Console.WriteLine("\n\nPress <ENTER> to exit...");
System.Console.ReadLine();
}
};
}
class eventcount
{
public:
typedef unsigned long key_type;
private:
mutable std::atomic<key_type> m_count;
std::mutex m_mutex;
std::condition_variable m_cond;
private:
void prv_signal(key_type key)
{
if (key & 1)
{
m_mutex.lock($);
while (! m_count($).compare_exchange_weak(
key,
(key + 2) & ~1,
std::memory_order_seq_cst));
m_mutex.unlock($);
m_cond.notify_all($);
}
}
public:
eventcount()
: m_count(0)
{
}
public:
key_type get() const
{
return m_count($).fetch_or(1, std::memory_order_acquire);
}
void signal()
{
prv_signal(m_count($).fetch_add(0, std::memory_order_seq_cst));
}
void signal_relaxed()
{
prv_signal(m_count($).load(std::memory_order_relaxed));
}
void wait(key_type cmp)
{
m_mutex.lock($);
key_type cur = m_count($).load(std::memory_order_seq_cst);
if ((cur & ~1) == (cmp & ~1))
{
m_cond.wait(m_mutex, $);
}
m_mutex.unlock($);
}
};
#define RL_GC
#include
#include
class eventcount {
public:
typedef unsigned long key_type;
private:
mutable rl::atomic m_count;
rl::mutex m_mutex;
rl::condition_variable m_cond;
void prv_signal(key_type key) {
if (key & 1) {
m_mutex.lock($);
while (! m_count($).compare_exchange_weak(key, (key + 2) & ~1,
rl::memory_order_seq_cst));
m_mutex.unlock($);
m_cond.notify_all($);
}
}
public:
eventcount() {
m_count($).store(0, rl::memory_order_relaxed);
}
public:
key_type get() const {
return m_count($).fetch_or(1, rl::memory_order_acquire);
}
void signal() {
prv_signal(m_count($).fetch_add(0, rl::memory_order_seq_cst));
}
void signal_relaxed() {
prv_signal(m_count($).load(rl::memory_order_relaxed));
}
void wait(key_type cmp) {
m_mutex.lock($);
if ((m_count($).load(rl::memory_order_seq_cst) & ~1) == (cmp & ~1)) {
m_cond.wait(m_mutex, $);
}
m_mutex.unlock($);
}
};
template
class mpmcq {
struct node {
rl::atomic m_next;
T volatile m_state;
};
rl::atomic m_head;
rl::atomic m_tail;
public:
mpmcq() {
node* n = RL_NEW(node);
n->m_next($).store(NULL, rl::memory_order_relaxed);
m_head($).store(n, rl::memory_order_relaxed);
m_tail($).store(n, rl::memory_order_relaxed);
}
~mpmcq() {
RL_ASSERT(m_head($).load(rl::memory_order_relaxed) ==
m_tail($).load(rl::memory_order_relaxed));
}
public:
void push(T& state) {
node* n = RL_NEW(node);
n->m_next($).store(NULL, rl::memory_order_relaxed);
n->m_state = state;
node* p = m_head($).exchange(n, rl::memory_order_seq_cst);
p->m_next($).store(n, rl::memory_order_seq_cst);
}
bool pop(T& state) {
node* n;
node* t = m_tail($).load(rl::memory_order_seq_cst);
do {
n = t->m_next($).load(rl::memory_order_seq_cst);
if (! n) return false;
state = n->m_state;
} while (! m_tail($).compare_exchange_weak(t, n, rl::memory_order_seq_cst));
return true;
}
};
#define PRODUCERS 2
#define CONSUMERS 2
#define THREADS (PRODUCERS + CONSUMERS)
#define ITERS 6
struct mpmcq_test : rl::test_suite {
eventcount m_ecount;
mpmcq m_queue;
std::atomic m_count;
void before() {
m_count($).store(PRODUCERS * ITERS, rl::memory_order_relaxed);
}
void invariant() {
RL_ASSERT(m_count($).load(rl::memory_order_relaxed) > -1);
}
void after() {
RL_ASSERT(! m_count($).load(rl::memory_order_relaxed));
}
void thread(unsigned tidx) {
int i;
if (tidx < PRODUCERS) {
for (i = 0; i < ITERS; ++i) {
m_queue.push(i);
m_ecount.signal();
}
} else {
do {
while (! m_queue.pop(i)) {
eventcount::key_type key = m_ecount.get();
if (m_queue.pop(i)) break;
m_ecount.wait(key);
}
} while (i != -666 &&
m_count($).fetch_add(-1, rl::memory_order_relaxed) != 1);
if (i != -666) {
for (i = 1; i < CONSUMERS; ++i) {
int x = -666;
m_queue.push(x);
}
m_ecount.signal();
}
}
}
};
int main() {
rl::test_params params;
params.iteration_count = 99999999;
//params.search_type = rl::fair_full_search_scheduler_type;
//params.search_type = rl::fair_context_bound_scheduler_type;
rl::simulate(params);
std::puts("nnn____________________________________n"
"DONE!!!!npress to exit...");
std::fflush(stdin);
std::fflush(stdout);
std::getchar();
return 0;
}
/** Fine-grained Eventcount
* Copyright (C) 2008 Dmitriy S. V'jukov
*/
#include
#if defined(WIN32) && defined(_MSC_VER)
#include
#include
class semaphore
{
public:
semaphore()
{
h_ = CreateSemaphore(0, 0, LONG_MAX, 0);
}
~semaphore()
{
CloseHandle(h_);
}
void wait()
{
WaitForSingleObject(h_, INFINITE);
}
void post()
{
ReleaseSemaphore(h_, 1, 0);
}
private:
HANDLE h_;
semaphore(semaphore const&);
semaphore& operator = (semaphore const&);
};
class mutex
{
public:
mutex()
{
InitializeCriticalSection(&cs_);
}
~mutex()
{
DeleteCriticalSection(&cs_);
}
void lock()
{
EnterCriticalSection(&cs_);
}
void unlock()
{
LeaveCriticalSection(&cs_);
}
private:
CRITICAL_SECTION cs_;
mutex(mutex const&);
mutex& operator = (mutex const&);
};
void full_memory_fence()
{
_mm_mfence();
}
#define THREAD_LOCAL __declspec(thread)
#elif defined(POSIX) && defined(GCC)
#include
#include
class semaphore
{
public:
semaphore()
{
sem_init(&sem_, 0, 0);
}
~semaphore()
{
sem_destroy(&sem_);
}
void wait()
{
sem_wait(&sem_);
}
void post()
{
sem_post(&sem_);
}
private:
sem_t sem_;
semaphore(semaphore const&);
semaphore& operator = (semaphore const&);
};
class mutex
{
public:
mutex()
{
pthread_mutex_init(&mutex_, 0);
}
~mutex()
{
pthread_mutex_destroy(&mutex_);
}
void lock()
{
pthread_mutex_lock(&mutex_);
}
void unlock()
{
pthread_mutex_unlock(&mutex_);
}
private:
pthread_mutex_t mutex_;
mutex(mutex const&);
mutex& operator = (mutex const&);
};
void full_memory_fence()
{
__sync_synchronize();
}
#define THREAD_LOCAL __thread
#endif
class lock
{
public:
lock(mutex& m)
: m_(m)
{
m.lock();
}
~lock()
{
m_.unlock();
}
private:
mutex& m_;
lock(lock const&);
lock& operator = (lock const&);
};
/** simple single-threaded double-linked list
* nothing interesting
*/
class dlist
{
public:
struct node
{
node* prev_;
node* next_;
node()
{
prev_ = 0;
next_ = 0;
}
};
dlist()
{
reset();
}
void push(node* n)
{
size_ += 1;
n->next_ = head_.next_;
n->prev_ = &head_;
head_.next_->prev_ = n;
head_.next_ = n;
}
node* pop()
{
if (size_ == 0)
return 0;
node* n = head_.next_;
remove(n);
return n;
}
void remove(node* n)
{
size_ -= 1;
n->prev_->next_ = n->next_;
n->next_->prev_ = n->prev_;
}
size_t size() const
{
return size_;
}
node* begin()
{
return head_.next_;
}
void flush_to(dlist& target)
{
if (size_)
{
target.size_ = size_;
target.head_.next_ = head_.next_;
target.head_.next_->prev_ = &target.head_;
target.tail_.prev_ = tail_.prev_;
target.tail_.prev_->next_ = &target.tail_;
}
else
{
target.reset();
}
reset();
}
static bool not_last(node* n)
{
return n->next_ != 0;
}
static node* get_next(node* n)
{
return n->next_;
}
private:
size_t volatile size_;
node head_;
node tail_;
void reset()
{
size_ = 0;
head_.next_ = &tail_;
head_.prev_ = 0;
tail_.next_ = 0;
tail_.prev_ = &head_;
}
dlist(dlist const&);
dlist& operator = (dlist const&);
};
// -------
/** Fine-grained Eventcount
* Copyright (C) 2008 Dmitriy S. V'jukov
*/
/** pre-thread descriptor for eventcount
*/
struct ec_thread
{
dlist::node node_;
semaphore sema_;
unsigned epoch_;
bool volatile in_waitset_;
bool spurious_;
void* ctx_;
ec_thread()
{
epoch_ = 0;
in_waitset_ = false;
spurious_ = false;
ctx_ = 0;
}
~ec_thread()
{
if (spurious_)
sema_.wait();
}
static ec_thread* current()
{
static THREAD_LOCAL ec_thread* ec_thread_instance = 0;
ec_thread* instance = ec_thread_instance;
if (instance == 0)
{
instance = new ec_thread;
ec_thread_instance = instance;
}
return instance;
// instance must be destroyed in DllMain() callback
// or in pthread_key_create() callback
}
private:
ec_thread(ec_thread const&);
ec_thread& operator = (ec_thread const&);
};
/** fine-grained eventcount implementation
*/
class eventcount
{
public:
eventcount()
{
epoch_ = 0;
}
void prepare_wait(void* ctx = 0)
{
ec_thread* th = ec_thread::current();
// this is good place to pump previous spurious wakeup
if (th->spurious_)
{
th->spurious_ = false;
th->sema_.wait();
}
th->in_waitset_ = true;
th->ctx_ = ctx;
{
lock l (mtx_);
th->epoch_ = epoch_;
waitset_.push(&th->node_);
}
full_memory_fence();
}
void wait()
{
ec_thread* th = ec_thread::current();
// this check is just an optimization
if (th->epoch_ == epoch_)
th->sema_.wait();
else
retire_wait();
}
void retire_wait()
{
ec_thread* th = ec_thread::current();
// spurious wakeup will be pumped in following prepare_wait()
th->spurious_ = true;
// try to remove node from waitset
if (th->in_waitset_)
{
lock l (mtx_);
if (th->in_waitset_)
{
// successfully removed from waitset,
// so there will be no spurious wakeup
th->in_waitset_ = false;
th->spurious_ = false;
waitset_.remove(&th->node_);
}
}
}
void notify_one()
{
full_memory_fence();
notify_one_relaxed();
}
template
void notify(predicate_t pred)
{
full_memory_fence();
notify_relaxed(pred);
}
void notify_all()
{
full_memory_fence();
notify_all_relaxed();
}
void notify_one_relaxed()
{
if (waitset_.size() == 0)
return;
dlist::node* n;
{
lock l (mtx_);
epoch_ += 1;
n = waitset_.pop();
if (n)
to_ec_thread(n)->in_waitset_ = false;
}
if (n)
{
to_ec_thread(n)->sema_.post();
}
}
template
void notify_relaxed(predicate_t pred)
{
if (waitset_.size() == 0)
return;
dlist temp;
{
lock l (mtx_);
epoch_ += 1;
size_t size = waitset_.size();
size_t idx = 0;
dlist::node* n = waitset_.begin();
while (dlist::not_last(n))
{
dlist::node* next = dlist::get_next(n);
ec_thread* th = to_ec_thread(n);
if (pred(th->ctx_, size, idx))
{
waitset_.remove(n);
temp.push(n);
th->in_waitset_ = false;
}
n = next;
idx += 1;
}
}
dlist::node* n = temp.begin();
while (dlist::not_last(n))
{
dlist::node* next = dlist::get_next(n);
to_ec_thread(n)->sema_.post();
n = next;
}
}
void notify_all_relaxed()
{
if (waitset_.size() == 0)
return;
dlist temp;
{
lock l (mtx_);
epoch_ += 1;
waitset_.flush_to(temp);
dlist::node* n = temp.begin();
while (dlist::not_last(n))
{
to_ec_thread(n)->in_waitset_ = false;
n = dlist::get_next(n);
}
}
dlist::node* n = temp.begin();
while (dlist::not_last(n))
{
dlist::node* next = dlist::get_next(n);
to_ec_thread(n)->sema_.post();
n = next;
}
}
private:
mutex mtx_;
dlist waitset_;
volatile unsigned epoch_;
ec_thread* to_ec_thread(dlist::node* n)
{
return (ec_thread*)((char*)n - offsetof(ec_thread, node_));
}
eventcount(eventcount const&);
eventcount& operator = (eventcount const&);
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment