Skip to content

Instantly share code, notes, and snippets.

@rsms
Created August 9, 2014 18:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rsms/5b112b7c4ec13b64177b to your computer and use it in GitHub Desktop.
Save rsms/5b112b7c4ec13b64177b to your computer and use it in GitHub Desktop.
rx async group break-out
#include "asyncgroup.hh"
using namespace rx;
AsyncCanceler DoThingAsync(Thing);
AsyncCanceler DoManyThingsAsync(Things things, func<void(Error)> cb) {
AsyncGroup G{cb};
for (auto& thing : things) {
// Capture `job` in closure and assign DoThingAsync canceler to the job
auto* job = G.begin();
*job = DoThingAsync(thing, [=](Error err) {
G.end(job, err);
});
}
return G.canceler();
}
#pragma once
#include "func.hh"
namespace rx {
using AsyncCanceler = func<void()>;
// Returned from asynchronous tasks which can be canceled by calling this object.
// A canceler can be called multiple times by multiple threads w/o any side effects.
} // namespace
#include "asyncgroup.hh"
// using std::cerr;
// using std::endl;
// #define DBG(...) std::cout << "[" << rx::cx_basename(__FILE__) << "] " << __VA_ARGS__ << endl;
#define DBG(...)
#define fwdarg(a) ::std::forward<decltype(a)>(a)
namespace rx {
_AsyncGroup::~_AsyncGroup() {
once(_endFlag, [&]{
if (!_jobs.empty()) {
_cb({"Some jobs did not end properly."
" Call `asyncGroup.end(job[, error])` to mark a job as completed"});
} else {
_cb(nullptr);
}
});
}
const AsyncGroup::Job* AsyncGroup::begin() const {
auto jobID = self->_nextJobID++;
DBG("AS@"<<(void*)self<<" begin job #"<<jobID)
return &*self->_jobs.emplace(jobID).first;
}
const AsyncGroup::Job& AsyncGroup::Job::operator=(AsyncCanceler&& canceler) const {
DBG("assign canceler to job #"<<_jobID)
const_cast<Job*>(this)->_canceler = fwdarg(canceler);
return *this;
}
const AsyncGroup::Job& AsyncGroup::Job::operator=(const AsyncCanceler& canceler) const {
DBG("assign canceler to job #"<<_jobID)
const_cast<Job*>(this)->_canceler = fwdarg(canceler);
return *this;
}
void AsyncGroup::end(const Job* job, Error err) const {
DBG("AS@"<<(void*)self<<" end job #"<<job->_jobID)
auto& jobs = self->_jobs;
auto I = jobs.find(*job);
if (I == jobs.end()) {
// Should never happen. If we have been canceled, then the job should have been canceled too.
err = {"Trying to end job #" + std::to_string(job->_jobID) + " that has already ended"};
} else {
jobs.erase(I);
}
if (err || jobs.empty()) {
once(self->_endFlag, [&]{
if (err) self->cancelAllJobs();
self->_cb(err);
});
}
}
void _AsyncGroup::cancelAllJobs() {
DBG("AS@"<<(void*)this<<" canceling all jobs")
for (auto& job : _jobs) {
if (job._canceler) {
DBG("AS@"<<(void*)this<<" canceling job #"<<job._jobID)
job._canceler();
}
}
}
bool AsyncGroup::cancel() const { // returns true if the call caused the set to be canceled. Thread safe.
bool didCancel = false;
once(self->_endFlag, [&]{
self->cancelAllJobs();
didCancel = true;
});
return didCancel;
}
AsyncCanceler AsyncGroup::canceler() {
auto ref = *this;
return [ref]{
if (ref) {
ref.cancel();
ref.resetSelf();
}
};
}
} // namespace
#pragma once
#include "ref.hh"
#include "once.hh"
#include "error.hh"
#include "asynccanceler.hh"
namespace rx {
using std::string;
// ----------------------------------------------------------------------------------------------
struct _AsyncGroup;
using AsyncGroupID = size_t;
struct AsyncGroup : Ref<_AsyncGroup> {
struct Job; // opaque type that can be assigned AsyncCanceler
AsyncGroup(func<void(Error)> cb);
AsyncGroup(const AsyncGroup&);
AsyncGroup(AsyncGroup&&);
bool cancel() const;
// Cancel any incomplete job. Returns true if the call caused the set to be canceled.
// After calling this with a true return value, the callback passed when constructing the
// AsyncGroup is guaranteed *not* to be called. This is also true for any in-flight jobs:
// their callbacks won't be called either. Thread safe.
AsyncCanceler canceler();
// A canceler. Essentially just a wrapper around `cancel()` with the addition of holding a
// reference to this AsyncGroup until either disposed or called. Just like any AsyncCanceler,
// subsequent calls have no effect.
const Job* begin() const;
// Start a new job
void end(const Job*, Error err=nullptr) const;
// End a job
};
/* Example:
AsyncCanceler DoThingAsync(Thing);
AsyncCanceler DoManyThingsAsync(Things things, func<void(Error)> cb) {
AsyncGroup G{cb};
for (auto& thing : things) {
// Capture `job` in closure and assign DoThingAsync canceler to the job
auto* job = G.begin();
*job = DoThingAsync(thing, [=](Error err) {
G.end(job, err);
});
}
return G.canceler();
}
*/
// ===============================================================================================
struct AsyncGroup::Job {
Job() {}
Job(const Job&) = delete;
Job(Job&&) = default;
Job(size_t jobID) : _jobID{jobID} {}
const Job& operator=(AsyncCanceler&&) const;
const Job& operator=(const AsyncCanceler&) const;
size_t _jobID = SIZE_MAX;
AsyncCanceler _canceler;
};
struct _AsyncGroup : SafeRefCounted<_AsyncGroup> {
struct JobEQ {
constexpr bool operator()(const AsyncGroup::Job& lhs, const AsyncGroup::Job& rhs) const {
return lhs._jobID == rhs._jobID;
}
};
struct JobHash {
constexpr size_t operator()(const AsyncGroup::Job& v) const { return v._jobID; }
};
using JobSet = std::unordered_set<AsyncGroup::Job,JobHash,JobEQ>;
_AsyncGroup(func<void(Error)> cb) : _cb{cb} {}
~_AsyncGroup();
void cancelAllJobs();
func<void(Error)> _cb;
JobSet _jobs;
size_t _nextJobID = 0;
OnceFlag _endFlag;
};
inline AsyncGroup::AsyncGroup(func<void(Error)> cb) : Ref{new _AsyncGroup{cb}} {}
inline AsyncGroup::AsyncGroup(const AsyncGroup& rhs) : Ref{rhs} {}
inline AsyncGroup::AsyncGroup(AsyncGroup&& rhs) : Ref{rhs} {}
} // namespace
#pragma once
namespace rx {
// Error type which has a very low cost when there's no error.
//
// - When representing "no error" (Error::OK()) the code generated is simply a pointer-sized int.
//
// - When representing an error, a single memory allocation is incured at construction which itself
// represents both the error code (first byte) as well as any message (remaining bytes) terminated
// by a NUL sentinel byte.
//
struct Error {
using Code = uint32_t;
static Error OK(); // == OK (no error)
Error(Code);
Error(int);
Error(Code, const std::string& error_message);
Error(int, const std::string& error_message);
Error(const std::string& error_message);
Error() noexcept; // == OK (no error)
Error(std::nullptr_t) noexcept; // == OK (no error)
~Error();
bool ok() const; // true if no error
Code code() const;
const char* message() const;
operator bool() const; // == !ok() -- true when representing an error
Error(const Error& other);
Error(Error&& other);
Error& operator=(const Error& s);
Error& operator=(Error&& s);
// ------------------------------------------------------------------------------------------------
private:
void _set_state(Code code, const std::string& message);
static const char* _copy_state(const char* other);
// OK status has a NULL _state. Otherwise, _state is:
// _state[0..sizeof(Code)] == code
// _state[sizeof(Code)..] == message c-string
const char* _state;
};
inline Error::Error() noexcept : _state{NULL} {} // == OK
inline Error::Error(std::nullptr_t _) noexcept : _state{NULL} {} // == OK
inline Error::~Error() { if (_state) delete[] _state; }
inline Error Error::OK() { return Error{}; }
inline bool Error::ok() const { return !_state; }
inline Error::operator bool() const { return _state; }
inline Error::Code Error::code() const { return _state ? *(Code*)_state : 0; }
inline const char* Error::message() const { return _state ? &_state[sizeof(Code)] : ""; }
inline std::ostream& operator<< (std::ostream& os, const Error& e) {
if (!e) {
return os << "OK";
} else {
return os << e.message() << " (#" << (int)e.code() << ')';
}
}
inline const char* Error::_copy_state(const char* other) {
if (other == 0) {
return 0;
} else {
size_t size = strlen(&other[sizeof(Code)]) + sizeof(Code) + 1;
char* state = new char[size];
memcpy(state, other, size);
return state;
}
}
inline void Error::_set_state(Code code, const std::string& message) {
char* state = new char[sizeof(Code) + message.size() + 1];
*(Code*)state = code;
// state[0] = static_cast<char>(code);
memcpy(state + sizeof(Code), message.data(), message.size());
state[sizeof(Code) + message.size()] = '\0';
_state = state;
}
inline Error::Error(const Error& other) {
_state = _copy_state(other._state);
}
inline Error::Error(Error&& other) {
_state = nullptr;
std::swap(other._state, _state);
}
inline Error::Error(Code code) {
char* state = new char[sizeof(Code)+1];
*(Code*)state = code;
// state[0] = static_cast<char>(code);
state[sizeof(Code)] = 0;
_state = state;
}
inline Error::Error(int code) : Error{(Error::Code)code} {}
inline Error::Error(Code code, const std::string& msg) {
_set_state(code, msg);
}
inline Error::Error(int code, const std::string& msg) : Error{(Error::Code)code, msg} {}
inline Error::Error(const std::string& message) {
_set_state(0, message);
}
inline Error& Error::operator=(const Error& other) {
if (_state != other._state) {
delete[] _state;
_state = _copy_state(other._state);
}
return *this;
}
inline Error& Error::operator=(Error&& other) {
if (_state != other._state) {
std::swap(other._state, _state);
}
return *this;
}
} // namespace
#pragma once
namespace rx {
template<class T> using func = std::function<T>;
} // namespace
#pragma once
namespace rx {
struct OnceFlag { volatile long s = 0; };
template<class F> inline void once(OnceFlag& pred, F f) {
if (pred.s == 0L && __sync_bool_compare_and_swap(&pred.s, 0L, 1L)) f();
}
// Perform f exactly once with no risk of thread race conditions.
// Example:
// static OnceFlag flag; once(flag, []{ cerr << "happens exactly once" << endl; });
} // namespace
#include <algorithm>
#include <functional>
#include <memory>
#include <string>
#include <unordered_set>
#pragma once
namespace rx {
struct PlainRefCounter {
using value_type = uint32_t;
static void retain(value_type& v) { ++v; }
static bool release(value_type& v) { return --v == 0; }
};
struct AtomicRefCounter {
using value_type = volatile uint32_t;
static void retain(value_type& v) { __sync_add_and_fetch(&v, 1); }
static bool release(value_type& v) { return __sync_sub_and_fetch(&v, 1) == 0; }
};
template <typename T>
struct Ref {
T* self;
Ref() : self{nullptr} {}
Ref(std::nullptr_t) : self{nullptr} {}
explicit Ref(T* p, bool add_ref=false) : self{p} { if (add_ref && p) { self->retainRef(); } }
Ref(const Ref& rhs) : self{rhs.self} { if (self) self->retainRef(); }
Ref(const Ref* rhs) : self{rhs->self} { if (self) self->retainRef(); }
Ref(Ref&& rhs) { self = std::move(rhs.self); rhs.self = 0; }
~Ref() { if (self) self->releaseRef(); }
void resetSelf(std::nullptr_t=nullptr) const {
if (self) {
auto* s = const_cast<Ref*>(this);
s->self->releaseRef();
s->self = nullptr;
}
}
Ref& resetSelf(const T* p) const {
T* old = self;
const_cast<Ref*>(this)->self = const_cast<T*>(p);
if (self) self->retainRef();
if (old) old->releaseRef();
return *const_cast<Ref*>(this);
}
Ref& operator=(const Ref& rhs) { return resetSelf(rhs.self); }
Ref& operator=(T* rhs) { return resetSelf(rhs); }
Ref& operator=(const T* rhs) { return resetSelf(rhs); }
Ref& operator=(std::nullptr_t) { return resetSelf(nullptr); }
Ref& operator=(Ref&& rhs) {
if (self != rhs.self && self) {
self->releaseRef();
self = nullptr;
}
std::swap(self, rhs.self);
return *this;
}
T* operator->() const { return self; }
operator bool() const { return self != nullptr; }
};
template <typename T, class C>
struct RefCounted {
typename C::value_type __refc = 1;
virtual ~RefCounted() = default;
void retainRef() { C::retain(__refc); }
bool releaseRef() { return C::release(__refc) && ({ delete this; true; }); }
using Ref = Ref<T>;
};
template <typename T> using UnsafeRefCounted = RefCounted<T, PlainRefCounter>;
template <typename T> using SafeRefCounted = RefCounted<T, AtomicRefCounter>;
// Example:
//
// struct ReqBase {
// uv_fs_t uvreq;
// };
//
// struct StatReq final : ReqBase, SafeRefCounted<StatReq> {
// StatReq(StatCallback&& cb) : Req{}, cb{fwdarg(cb)} {}
// StatCallback cb;
// };
//
} // namespace
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment