Last active
May 7, 2023 09:08
-
-
Save MBkkt/e60be24e7c6ecae6b929b2a682e0bfe1 to your computer and use it in GitHub Desktop.
Aggregator -- pseudo code
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
enum class AggregatorType : uint64_t { | |
kAny = 0, // At least few functions needed, so runtime detection | |
// This is single operation case. | |
// Now only single exception: average(kCount + kSum). | |
// It is assumed that if we need one function, we instantiate it, if several then kAny. | |
// But if for some reason we decide to optimize a specific case (for example Min + Max) this is also possible! | |
kCount = 1 << 0, | |
kMin = 1 << 1, | |
kMax = 1 << 2, | |
kSum = 1 << 3, | |
kVar = 1 << 4, | |
kUnique = 1 << 5, // Can be used for unique and distinct | |
kSorted = 1 << 6, | |
kBitAnd = 1 << 7, | |
kBitOr = 1 << 8, | |
kBitXor = 1 << 9, | |
}; | |
struct Unit {}; | |
template<typename T> | |
struct State { | |
public: | |
void Construct() { | |
new (&impl_) T{}; | |
} | |
void Destroy() { | |
impl_.~T(); | |
} | |
T& GetImpl() { | |
return impl_; | |
} | |
private: | |
union { | |
T impl_; | |
}; | |
}; | |
struct EmptyState { | |
struct Impl { | |
template<typename V> | |
[[always_inline]] void Push(V&&) {} | |
}; | |
Impl GetImpl(); | |
}; | |
template<typename T, bool Needed> | |
using Wrapper = std::conditional_t<Needed, T, EmptyState>; | |
struct AggregatorBase { | |
virtual ~AggregatorBase(); | |
virtual Push(VPack) = 0; | |
virtual Push(double) = 0; | |
... | |
}; | |
template<bool DBServer, typename T, AggregatorType Type> | |
struct Aggregator final : AggregatorBase { | |
// Used on DBServer for different shards | |
void MergeFrom(Aggregator&& other); | |
// Used on DBServer, it doesn't use vpack | |
Serialize(BinaryOutput& output); | |
// Used on Coordinator for different DBServer | |
void MergeFrom(BinaryInput& input); | |
// Used on DBServer to aggregate values | |
template<typename Value> | |
void Push(Value v) { | |
// if constexpr Value == VPack/Any/AqlValue make single switch | |
PushImpl(...); | |
} | |
// Can be used on DBServer when it SingleServer | |
// Used on Coordinator to Produce value for aggregation function | |
template<typename Result> | |
Result Count() const; | |
double Avg() const; | |
double Sum() const; | |
T Min() const; | |
T Max() const; | |
// ... | |
private: | |
void SetWasNotNumeric() { | |
if constexpr (T == VPack/Any/etc && Type == kAny/kSum/etc) { | |
was_not_numeric_ = true; | |
} | |
} | |
void PushImpl(bool v); | |
void PushImpl(uint64_t v); | |
void PushImpl(std::string_view s) { | |
SetWasNotNumeric(); | |
} | |
void PushImpl(double v) { | |
if constexpr (Type == kAny) { | |
auto control = control_.GetImpl(); | |
if (control.Has(AggregatorType::kCount)) { | |
count_.GetImpl().Push(v); | |
} | |
if (control.Has(AggregatorType::kMin)) { | |
min_.GetImpl().Push(v); | |
} | |
if (control.Has(AggregatorType::kMax)) { | |
max_.GetImpl().Push(v); | |
} | |
// ... | |
// sum_.GetImpl().Push(v); | |
// unique_.GetImpl().Push(v); | |
// sorted_.GetImpl().Push(v); | |
// bit_and_.GetImpl().Push(v); | |
// bit_or_.GetImpl().Push(v); | |
// bit_xor_.GetImpl().Push(v); | |
} else { | |
count_.GetImpl().Push(v); | |
min_.GetImpl().Push(v); | |
max_.GetImpl().Push(v); | |
sum_.GetImpl().Push(v); | |
unique_.GetImpl().Push(v); | |
sorted_.GetImpl().Push(v); | |
bit_and_.GetImpl().Push(v); | |
bit_or_.GetImpl().Push(v); | |
bit_xor_.GetImpl().Push(v); | |
} | |
} | |
template<AggregatorType Requested> | |
static constexpr bool Needed() { | |
if constexpr (Type == 0) { | |
return true; | |
} else { | |
return (Requested & Actual) != 0; | |
} | |
} | |
[[no_unique_address]] Wrapper<bool, ...> was_not_numeric_{false}; | |
[[no_unique_address]] Wrapper<ControlState, Needed<AggregatorType::kAny>> control_; | |
// numeric | |
[[no_unique_address]] Wrapper<CountState, Needed<AggregatorType::kCount>> count_; | |
[[no_unique_address]] Wrapper<SumState, Needed<AggregatorType::kSum>> sum_; | |
[[no_unique_address]] Wrapper<BitAnd, Needed<AggregatorType::kBitAnd>> bit_and_; | |
[[no_unique_address]] Wrapper<BitOr, Needed<AggregatorType::kBit>> bit_or_; | |
[[no_unique_address]] Wrapper<BitXor, Needed<AggregatorType::kCount>> bit_xor_; | |
// any | |
[[no_unique_address]] Wrapper<MinState<T>, Needed<AggregatorType::kMin>> min_; | |
[[no_unique_address]] Wrapper<MaxState<T>, Needed<AggregatorType::kMax>> max_; | |
[[no_unique_address]] Wrapper<UniqueState<T>, Needed<AggregatorType::kUnique>> unique_; | |
[[no_unique_address]] Wrapper<SortedState<T>, Needed<AggregatorType::kSorted>> sorted_; | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment