Skip to content

Instantly share code, notes, and snippets.

@MBkkt
Last active May 7, 2023 09:08
Show Gist options
  • Save MBkkt/e60be24e7c6ecae6b929b2a682e0bfe1 to your computer and use it in GitHub Desktop.
Save MBkkt/e60be24e7c6ecae6b929b2a682e0bfe1 to your computer and use it in GitHub Desktop.
Aggregator -- pseudo code
enum class AggregatorType : uint64_t {
kAny = 0, // At least few functions needed, so runtime detection
// This is single operation case.
// Now only single exception: average(kCount + kSum).
// It is assumed that if we need one function, we instantiate it, if several then kAny.
// But if for some reason we decide to optimize a specific case (for example Min + Max) this is also possible!
kCount = 1 << 0,
kMin = 1 << 1,
kMax = 1 << 2,
kSum = 1 << 3,
kVar = 1 << 4,
kUnique = 1 << 5, // Can be used for unique and distinct
kSorted = 1 << 6,
kBitAnd = 1 << 7,
kBitOr = 1 << 8,
kBitXor = 1 << 9,
};
struct Unit {};
template<typename T>
struct State {
public:
void Construct() {
new (&impl_) T{};
}
void Destroy() {
impl_.~T();
}
T& GetImpl() {
return impl_;
}
private:
union {
T impl_;
};
};
struct EmptyState {
struct Impl {
template<typename V>
[[always_inline]] void Push(V&&) {}
};
Impl GetImpl();
};
template<typename T, bool Needed>
using Wrapper = std::conditional_t<Needed, T, EmptyState>;
struct AggregatorBase {
virtual ~AggregatorBase();
virtual Push(VPack) = 0;
virtual Push(double) = 0;
...
};
template<bool DBServer, typename T, AggregatorType Type>
struct Aggregator final : AggregatorBase {
// Used on DBServer for different shards
void MergeFrom(Aggregator&& other);
// Used on DBServer, it doesn't use vpack
Serialize(BinaryOutput& output);
// Used on Coordinator for different DBServer
void MergeFrom(BinaryInput& input);
// Used on DBServer to aggregate values
template<typename Value>
void Push(Value v) {
// if constexpr Value == VPack/Any/AqlValue make single switch
PushImpl(...);
}
// Can be used on DBServer when it SingleServer
// Used on Coordinator to Produce value for aggregation function
template<typename Result>
Result Count() const;
double Avg() const;
double Sum() const;
T Min() const;
T Max() const;
// ...
private:
void SetWasNotNumeric() {
if constexpr (T == VPack/Any/etc && Type == kAny/kSum/etc) {
was_not_numeric_ = true;
}
}
void PushImpl(bool v);
void PushImpl(uint64_t v);
void PushImpl(std::string_view s) {
SetWasNotNumeric();
}
void PushImpl(double v) {
if constexpr (Type == kAny) {
auto control = control_.GetImpl();
if (control.Has(AggregatorType::kCount)) {
count_.GetImpl().Push(v);
}
if (control.Has(AggregatorType::kMin)) {
min_.GetImpl().Push(v);
}
if (control.Has(AggregatorType::kMax)) {
max_.GetImpl().Push(v);
}
// ...
// sum_.GetImpl().Push(v);
// unique_.GetImpl().Push(v);
// sorted_.GetImpl().Push(v);
// bit_and_.GetImpl().Push(v);
// bit_or_.GetImpl().Push(v);
// bit_xor_.GetImpl().Push(v);
} else {
count_.GetImpl().Push(v);
min_.GetImpl().Push(v);
max_.GetImpl().Push(v);
sum_.GetImpl().Push(v);
unique_.GetImpl().Push(v);
sorted_.GetImpl().Push(v);
bit_and_.GetImpl().Push(v);
bit_or_.GetImpl().Push(v);
bit_xor_.GetImpl().Push(v);
}
}
template<AggregatorType Requested>
static constexpr bool Needed() {
if constexpr (Type == 0) {
return true;
} else {
return (Requested & Actual) != 0;
}
}
[[no_unique_address]] Wrapper<bool, ...> was_not_numeric_{false};
[[no_unique_address]] Wrapper<ControlState, Needed<AggregatorType::kAny>> control_;
// numeric
[[no_unique_address]] Wrapper<CountState, Needed<AggregatorType::kCount>> count_;
[[no_unique_address]] Wrapper<SumState, Needed<AggregatorType::kSum>> sum_;
[[no_unique_address]] Wrapper<BitAnd, Needed<AggregatorType::kBitAnd>> bit_and_;
[[no_unique_address]] Wrapper<BitOr, Needed<AggregatorType::kBit>> bit_or_;
[[no_unique_address]] Wrapper<BitXor, Needed<AggregatorType::kCount>> bit_xor_;
// any
[[no_unique_address]] Wrapper<MinState<T>, Needed<AggregatorType::kMin>> min_;
[[no_unique_address]] Wrapper<MaxState<T>, Needed<AggregatorType::kMax>> max_;
[[no_unique_address]] Wrapper<UniqueState<T>, Needed<AggregatorType::kUnique>> unique_;
[[no_unique_address]] Wrapper<SortedState<T>, Needed<AggregatorType::kSorted>> sorted_;
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment