Created
August 9, 2014 18:27
-
-
Save rsms/5b112b7c4ec13b64177b to your computer and use it in GitHub Desktop.
rx async group break-out
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include "asyncgroup.hh" | |
using namespace rx; | |
AsyncCanceler DoThingAsync(Thing); | |
AsyncCanceler DoManyThingsAsync(Things things, func<void(Error)> cb) { | |
AsyncGroup G{cb}; | |
for (auto& thing : things) { | |
// Capture `job` in closure and assign DoThingAsync canceler to the job | |
auto* job = G.begin(); | |
*job = DoThingAsync(thing, [=](Error err) { | |
G.end(job, err); | |
}); | |
} | |
return G.canceler(); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#pragma once | |
#include "func.hh" | |
namespace rx { | |
using AsyncCanceler = func<void()>; | |
// Returned from asynchronous tasks which can be canceled by calling this object. | |
// A canceler can be called multiple times by multiple threads w/o any side effects. | |
} // namespace |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include "asyncgroup.hh" | |
// using std::cerr; | |
// using std::endl; | |
// #define DBG(...) std::cout << "[" << rx::cx_basename(__FILE__) << "] " << __VA_ARGS__ << endl; | |
#define DBG(...) | |
#define fwdarg(a) ::std::forward<decltype(a)>(a) | |
namespace rx { | |
_AsyncGroup::~_AsyncGroup() { | |
once(_endFlag, [&]{ | |
if (!_jobs.empty()) { | |
_cb({"Some jobs did not end properly." | |
" Call `asyncGroup.end(job[, error])` to mark a job as completed"}); | |
} else { | |
_cb(nullptr); | |
} | |
}); | |
} | |
const AsyncGroup::Job* AsyncGroup::begin() const { | |
auto jobID = self->_nextJobID++; | |
DBG("AS@"<<(void*)self<<" begin job #"<<jobID) | |
return &*self->_jobs.emplace(jobID).first; | |
} | |
const AsyncGroup::Job& AsyncGroup::Job::operator=(AsyncCanceler&& canceler) const { | |
DBG("assign canceler to job #"<<_jobID) | |
const_cast<Job*>(this)->_canceler = fwdarg(canceler); | |
return *this; | |
} | |
const AsyncGroup::Job& AsyncGroup::Job::operator=(const AsyncCanceler& canceler) const { | |
DBG("assign canceler to job #"<<_jobID) | |
const_cast<Job*>(this)->_canceler = fwdarg(canceler); | |
return *this; | |
} | |
void AsyncGroup::end(const Job* job, Error err) const { | |
DBG("AS@"<<(void*)self<<" end job #"<<job->_jobID) | |
auto& jobs = self->_jobs; | |
auto I = jobs.find(*job); | |
if (I == jobs.end()) { | |
// Should never happen. If we have been canceled, then the job should have been canceled too. | |
err = {"Trying to end job #" + std::to_string(job->_jobID) + " that has already ended"}; | |
} else { | |
jobs.erase(I); | |
} | |
if (err || jobs.empty()) { | |
once(self->_endFlag, [&]{ | |
if (err) self->cancelAllJobs(); | |
self->_cb(err); | |
}); | |
} | |
} | |
void _AsyncGroup::cancelAllJobs() { | |
DBG("AS@"<<(void*)this<<" canceling all jobs") | |
for (auto& job : _jobs) { | |
if (job._canceler) { | |
DBG("AS@"<<(void*)this<<" canceling job #"<<job._jobID) | |
job._canceler(); | |
} | |
} | |
} | |
bool AsyncGroup::cancel() const { // returns true if the call caused the set to be canceled. Thread safe. | |
bool didCancel = false; | |
once(self->_endFlag, [&]{ | |
self->cancelAllJobs(); | |
didCancel = true; | |
}); | |
return didCancel; | |
} | |
AsyncCanceler AsyncGroup::canceler() { | |
auto ref = *this; | |
return [ref]{ | |
if (ref) { | |
ref.cancel(); | |
ref.resetSelf(); | |
} | |
}; | |
} | |
} // namespace |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#pragma once | |
#include "ref.hh" | |
#include "once.hh" | |
#include "error.hh" | |
#include "asynccanceler.hh" | |
namespace rx { | |
using std::string; | |
// ---------------------------------------------------------------------------------------------- | |
struct _AsyncGroup; | |
using AsyncGroupID = size_t; | |
struct AsyncGroup : Ref<_AsyncGroup> { | |
struct Job; // opaque type that can be assigned AsyncCanceler | |
AsyncGroup(func<void(Error)> cb); | |
AsyncGroup(const AsyncGroup&); | |
AsyncGroup(AsyncGroup&&); | |
bool cancel() const; | |
// Cancel any incomplete job. Returns true if the call caused the set to be canceled. | |
// After calling this with a true return value, the callback passed when constructing the | |
// AsyncGroup is guaranteed *not* to be called. This is also true for any in-flight jobs: | |
// their callbacks won't be called either. Thread safe. | |
AsyncCanceler canceler(); | |
// A canceler. Essentially just a wrapper around `cancel()` with the addition of holding a | |
// reference to this AsyncGroup until either disposed or called. Just like any AsyncCanceler, | |
// subsequent calls have no effect. | |
const Job* begin() const; | |
// Start a new job | |
void end(const Job*, Error err=nullptr) const; | |
// End a job | |
}; | |
/* Example: | |
AsyncCanceler DoThingAsync(Thing); | |
AsyncCanceler DoManyThingsAsync(Things things, func<void(Error)> cb) { | |
AsyncGroup G{cb}; | |
for (auto& thing : things) { | |
// Capture `job` in closure and assign DoThingAsync canceler to the job | |
auto* job = G.begin(); | |
*job = DoThingAsync(thing, [=](Error err) { | |
G.end(job, err); | |
}); | |
} | |
return G.canceler(); | |
} | |
*/ | |
// =============================================================================================== | |
struct AsyncGroup::Job { | |
Job() {} | |
Job(const Job&) = delete; | |
Job(Job&&) = default; | |
Job(size_t jobID) : _jobID{jobID} {} | |
const Job& operator=(AsyncCanceler&&) const; | |
const Job& operator=(const AsyncCanceler&) const; | |
size_t _jobID = SIZE_MAX; | |
AsyncCanceler _canceler; | |
}; | |
struct _AsyncGroup : SafeRefCounted<_AsyncGroup> { | |
struct JobEQ { | |
constexpr bool operator()(const AsyncGroup::Job& lhs, const AsyncGroup::Job& rhs) const { | |
return lhs._jobID == rhs._jobID; | |
} | |
}; | |
struct JobHash { | |
constexpr size_t operator()(const AsyncGroup::Job& v) const { return v._jobID; } | |
}; | |
using JobSet = std::unordered_set<AsyncGroup::Job,JobHash,JobEQ>; | |
_AsyncGroup(func<void(Error)> cb) : _cb{cb} {} | |
~_AsyncGroup(); | |
void cancelAllJobs(); | |
func<void(Error)> _cb; | |
JobSet _jobs; | |
size_t _nextJobID = 0; | |
OnceFlag _endFlag; | |
}; | |
inline AsyncGroup::AsyncGroup(func<void(Error)> cb) : Ref{new _AsyncGroup{cb}} {} | |
inline AsyncGroup::AsyncGroup(const AsyncGroup& rhs) : Ref{rhs} {} | |
inline AsyncGroup::AsyncGroup(AsyncGroup&& rhs) : Ref{rhs} {} | |
} // namespace |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#pragma once | |
namespace rx { | |
// Error type which has a very low cost when there's no error. | |
// | |
// - When representing "no error" (Error::OK()) the code generated is simply a pointer-sized int. | |
// | |
// - When representing an error, a single memory allocation is incured at construction which itself | |
// represents both the error code (first byte) as well as any message (remaining bytes) terminated | |
// by a NUL sentinel byte. | |
// | |
struct Error { | |
using Code = uint32_t; | |
static Error OK(); // == OK (no error) | |
Error(Code); | |
Error(int); | |
Error(Code, const std::string& error_message); | |
Error(int, const std::string& error_message); | |
Error(const std::string& error_message); | |
Error() noexcept; // == OK (no error) | |
Error(std::nullptr_t) noexcept; // == OK (no error) | |
~Error(); | |
bool ok() const; // true if no error | |
Code code() const; | |
const char* message() const; | |
operator bool() const; // == !ok() -- true when representing an error | |
Error(const Error& other); | |
Error(Error&& other); | |
Error& operator=(const Error& s); | |
Error& operator=(Error&& s); | |
// ------------------------------------------------------------------------------------------------ | |
private: | |
void _set_state(Code code, const std::string& message); | |
static const char* _copy_state(const char* other); | |
// OK status has a NULL _state. Otherwise, _state is: | |
// _state[0..sizeof(Code)] == code | |
// _state[sizeof(Code)..] == message c-string | |
const char* _state; | |
}; | |
inline Error::Error() noexcept : _state{NULL} {} // == OK | |
inline Error::Error(std::nullptr_t _) noexcept : _state{NULL} {} // == OK | |
inline Error::~Error() { if (_state) delete[] _state; } | |
inline Error Error::OK() { return Error{}; } | |
inline bool Error::ok() const { return !_state; } | |
inline Error::operator bool() const { return _state; } | |
inline Error::Code Error::code() const { return _state ? *(Code*)_state : 0; } | |
inline const char* Error::message() const { return _state ? &_state[sizeof(Code)] : ""; } | |
inline std::ostream& operator<< (std::ostream& os, const Error& e) { | |
if (!e) { | |
return os << "OK"; | |
} else { | |
return os << e.message() << " (#" << (int)e.code() << ')'; | |
} | |
} | |
inline const char* Error::_copy_state(const char* other) { | |
if (other == 0) { | |
return 0; | |
} else { | |
size_t size = strlen(&other[sizeof(Code)]) + sizeof(Code) + 1; | |
char* state = new char[size]; | |
memcpy(state, other, size); | |
return state; | |
} | |
} | |
inline void Error::_set_state(Code code, const std::string& message) { | |
char* state = new char[sizeof(Code) + message.size() + 1]; | |
*(Code*)state = code; | |
// state[0] = static_cast<char>(code); | |
memcpy(state + sizeof(Code), message.data(), message.size()); | |
state[sizeof(Code) + message.size()] = '\0'; | |
_state = state; | |
} | |
inline Error::Error(const Error& other) { | |
_state = _copy_state(other._state); | |
} | |
inline Error::Error(Error&& other) { | |
_state = nullptr; | |
std::swap(other._state, _state); | |
} | |
inline Error::Error(Code code) { | |
char* state = new char[sizeof(Code)+1]; | |
*(Code*)state = code; | |
// state[0] = static_cast<char>(code); | |
state[sizeof(Code)] = 0; | |
_state = state; | |
} | |
inline Error::Error(int code) : Error{(Error::Code)code} {} | |
inline Error::Error(Code code, const std::string& msg) { | |
_set_state(code, msg); | |
} | |
inline Error::Error(int code, const std::string& msg) : Error{(Error::Code)code, msg} {} | |
inline Error::Error(const std::string& message) { | |
_set_state(0, message); | |
} | |
inline Error& Error::operator=(const Error& other) { | |
if (_state != other._state) { | |
delete[] _state; | |
_state = _copy_state(other._state); | |
} | |
return *this; | |
} | |
inline Error& Error::operator=(Error&& other) { | |
if (_state != other._state) { | |
std::swap(other._state, _state); | |
} | |
return *this; | |
} | |
} // namespace |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#pragma once | |
namespace rx { | |
template<class T> using func = std::function<T>; | |
} // namespace |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#pragma once | |
namespace rx { | |
struct OnceFlag { volatile long s = 0; }; | |
template<class F> inline void once(OnceFlag& pred, F f) { | |
if (pred.s == 0L && __sync_bool_compare_and_swap(&pred.s, 0L, 1L)) f(); | |
} | |
// Perform f exactly once with no risk of thread race conditions. | |
// Example: | |
// static OnceFlag flag; once(flag, []{ cerr << "happens exactly once" << endl; }); | |
} // namespace |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <algorithm> | |
#include <functional> | |
#include <memory> | |
#include <string> | |
#include <unordered_set> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#pragma once | |
namespace rx { | |
struct PlainRefCounter { | |
using value_type = uint32_t; | |
static void retain(value_type& v) { ++v; } | |
static bool release(value_type& v) { return --v == 0; } | |
}; | |
struct AtomicRefCounter { | |
using value_type = volatile uint32_t; | |
static void retain(value_type& v) { __sync_add_and_fetch(&v, 1); } | |
static bool release(value_type& v) { return __sync_sub_and_fetch(&v, 1) == 0; } | |
}; | |
template <typename T> | |
struct Ref { | |
T* self; | |
Ref() : self{nullptr} {} | |
Ref(std::nullptr_t) : self{nullptr} {} | |
explicit Ref(T* p, bool add_ref=false) : self{p} { if (add_ref && p) { self->retainRef(); } } | |
Ref(const Ref& rhs) : self{rhs.self} { if (self) self->retainRef(); } | |
Ref(const Ref* rhs) : self{rhs->self} { if (self) self->retainRef(); } | |
Ref(Ref&& rhs) { self = std::move(rhs.self); rhs.self = 0; } | |
~Ref() { if (self) self->releaseRef(); } | |
void resetSelf(std::nullptr_t=nullptr) const { | |
if (self) { | |
auto* s = const_cast<Ref*>(this); | |
s->self->releaseRef(); | |
s->self = nullptr; | |
} | |
} | |
Ref& resetSelf(const T* p) const { | |
T* old = self; | |
const_cast<Ref*>(this)->self = const_cast<T*>(p); | |
if (self) self->retainRef(); | |
if (old) old->releaseRef(); | |
return *const_cast<Ref*>(this); | |
} | |
Ref& operator=(const Ref& rhs) { return resetSelf(rhs.self); } | |
Ref& operator=(T* rhs) { return resetSelf(rhs); } | |
Ref& operator=(const T* rhs) { return resetSelf(rhs); } | |
Ref& operator=(std::nullptr_t) { return resetSelf(nullptr); } | |
Ref& operator=(Ref&& rhs) { | |
if (self != rhs.self && self) { | |
self->releaseRef(); | |
self = nullptr; | |
} | |
std::swap(self, rhs.self); | |
return *this; | |
} | |
T* operator->() const { return self; } | |
operator bool() const { return self != nullptr; } | |
}; | |
template <typename T, class C> | |
struct RefCounted { | |
typename C::value_type __refc = 1; | |
virtual ~RefCounted() = default; | |
void retainRef() { C::retain(__refc); } | |
bool releaseRef() { return C::release(__refc) && ({ delete this; true; }); } | |
using Ref = Ref<T>; | |
}; | |
template <typename T> using UnsafeRefCounted = RefCounted<T, PlainRefCounter>; | |
template <typename T> using SafeRefCounted = RefCounted<T, AtomicRefCounter>; | |
// Example: | |
// | |
// struct ReqBase { | |
// uv_fs_t uvreq; | |
// }; | |
// | |
// struct StatReq final : ReqBase, SafeRefCounted<StatReq> { | |
// StatReq(StatCallback&& cb) : Req{}, cb{fwdarg(cb)} {} | |
// StatCallback cb; | |
// }; | |
// | |
} // namespace |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment