Skip to content

Instantly share code, notes, and snippets.

@25077667
Created May 21, 2024 19:35
Show Gist options
  • Save 25077667/b62b180771e0af2f5c3c6055e11594cf to your computer and use it in GitHub Desktop.
Save 25077667/b62b180771e0af2f5c3c6055e11594cf to your computer and use it in GitHub Desktop.
Efficient Resource Management and Evaluation of Large-scale Node Processing in C++
#include <iostream>
#include <memory>
#include <vector>
#include <string>
#include <stdexcept>
#include <stack>
#include <chrono>
// Resource pool for data locality
constexpr auto file_max_number = 20'000'000;
constexpr auto process_max_number = 1'000'000;
struct ResourcePool
{
std::vector<std::string> fileNamePool{file_max_number};
std::vector<std::string> fileHashPool{file_max_number};
std::vector<std::string> processNamePool{process_max_number};
std::vector<std::string> processTcpPool{process_max_number};
};
ResourcePool resourcePool;
enum class NodeType
{
File,
Process,
Combine,
Processor,
Unknown
};
struct ScanExpression
{
virtual ~ScanExpression() = default;
virtual void evaluate() = 0;
virtual int isLeaf() const { return 0; }
virtual NodeType getType() const { return NodeType::Unknown; }
};
struct ScanNode : public ScanExpression
{
bool evaluated = false;
virtual void process() const = 0;
__always_inline void evaluate() override
{
if (!evaluated)
{
process();
evaluated = true;
}
}
};
struct ScanProcessor;
struct FileNode;
struct ProcessNode;
struct CombineProcessor;
struct FileNode : public ScanNode
{
size_t filenameIndex;
FileNode(size_t index) : filenameIndex(index) {}
FileNode(FileNode &&) = default;
FileNode &operator=(FileNode &&) = default;
__always_inline void process() const override
{
std::clog << "Processing file: " << resourcePool.fileNamePool[filenameIndex] << "\n";
}
NodeType getType() const override { return NodeType::File; }
int isLeaf() const override { return 1; }
};
struct ProcessNode : public ScanNode
{
size_t processNameIndex;
ProcessNode(size_t index) : processNameIndex(index) {}
ProcessNode(ProcessNode &&) = default;
ProcessNode &operator=(ProcessNode &&) = default;
__always_inline void process() const override
{
std::clog << "Executing process: " << resourcePool.processNamePool[processNameIndex] << "\n";
}
NodeType getType() const override { return NodeType::Process; }
int isLeaf() const override { return 1; }
};
struct ScanProcessor : public ScanNode
{
std::shared_ptr<ScanNode> lhs;
std::shared_ptr<ScanNode> rhs;
template <typename Left, typename Right>
ScanProcessor(std::shared_ptr<Left> left, std::shared_ptr<Right> right)
: lhs(std::static_pointer_cast<ScanNode>(left)), rhs(std::static_pointer_cast<ScanNode>(right))
{
static_assert(std::is_base_of<ScanNode, Left>::value, "Left must derive from ScanNode");
static_assert(std::is_base_of<ScanNode, Right>::value, "Right must derive from ScanNode");
}
ScanProcessor(ScanProcessor &&) = default;
ScanProcessor &operator=(ScanProcessor &&) = default;
__always_inline void evaluate() override
{
std::stack<std::weak_ptr<ScanNode>> eval_seq_stack;
std::vector<std::weak_ptr<ScanNode>> eval_seq_queue;
eval_seq_stack.push(lhs);
eval_seq_stack.push(rhs);
while (!eval_seq_stack.empty())
{
auto node = eval_seq_stack.top();
eval_seq_stack.pop();
auto node_ptr = node.lock();
if (!node_ptr->isLeaf())
{
auto processor = std::static_pointer_cast<ScanProcessor>(node_ptr);
eval_seq_stack.push(processor->rhs);
eval_seq_stack.push(processor->lhs);
}
else
{
eval_seq_queue.push_back(node);
}
}
std::clog << "Evaluating nodes" << "\n";
std::clog << "Node size: " << eval_seq_queue.size() << "\n";
for (auto it = eval_seq_queue.rbegin(); it != eval_seq_queue.rend(); ++it)
{
auto node = it->lock();
if (node)
node->evaluate();
}
std::clog << "Finished evaluating nodes" << "\n";
}
NodeType getType() const override { return NodeType::Processor; }
};
struct CombineProcessor : public ScanProcessor
{
template <typename Left, typename Right>
CombineProcessor(std::shared_ptr<Left> left, std::shared_ptr<Right> right)
: ScanProcessor(left, right)
{
static_assert(std::is_base_of<ScanNode, Left>::value, "Left must derive from ScanNode");
static_assert(std::is_base_of<ScanNode, Right>::value, "Right must derive from ScanNode");
}
CombineProcessor(CombineProcessor &&) = default;
CombineProcessor &operator=(CombineProcessor &&) = default;
__always_inline void process() const override
{
std::clog << "Combining results from nodes" << "\n";
}
NodeType getType() const override { return NodeType::Combine; }
};
template <typename NodeType, typename... Args>
std::shared_ptr<NodeType> make_node(Args &&...args)
{
return std::make_shared<NodeType>(std::forward<Args>(args)...);
}
int main()
{
// Initialize resource pool data
for (size_t i = 0; i < file_max_number; ++i)
{
resourcePool.fileNamePool[i] = "file" + std::to_string(i) + ".txt";
resourcePool.fileHashPool[i] = "hash" + std::to_string(i);
}
for (size_t i = 0; i < process_max_number; ++i)
{
resourcePool.processNamePool[i] = "Process " + std::to_string(i);
resourcePool.processTcpPool[i] = "TCP " + std::to_string(i);
}
auto FilePool = std::vector<std::shared_ptr<FileNode>>(file_max_number);
for (size_t i = 0; i < file_max_number; ++i)
{
FilePool[i] = make_node<FileNode>(i);
}
auto ProcessPool = std::vector<std::shared_ptr<ProcessNode>>(process_max_number);
for (size_t i = 0; i < process_max_number; ++i)
{
ProcessPool[i] = make_node<ProcessNode>(i);
}
auto combinedNodes = make_node<CombineProcessor>(FilePool[0], FilePool[1]);
for (size_t i = 2; i < file_max_number; ++i)
{
combinedNodes = make_node<CombineProcessor>(combinedNodes, FilePool[i]);
}
for (size_t i = 0; i < process_max_number; ++i)
{
combinedNodes = make_node<CombineProcessor>(combinedNodes, ProcessPool[i]);
}
auto start = std::chrono::high_resolution_clock::now();
combinedNodes->evaluate();
auto end = std::chrono::high_resolution_clock::now();
std::chrono::duration<double> elapsed = end - start;
std::cout << "For " << file_max_number << " files and " << process_max_number << " processes, elapsed time: " << elapsed.count() << "s\n";
return 0;
}
ulimit -s 1048576
➜ test ulimit -a
-t: cpu time (seconds) unlimited
-f: file size (blocks) unlimited
-d: data seg size (kbytes) unlimited
-s: stack size (kbytes) 1048576
-c: core file size (blocks) unlimited
-m: resident set size (kbytes) unlimited
-u: processes 127280
-n: file descriptors 1024
-l: locked-in-memory size (kbytes) 8192
-v: address space (kbytes) unlimited
-x: file locks unlimited
-i: pending signals 127280
-q: bytes in POSIX msg queues 819200
-e: max nice 0
-r: max rt priority 0
-N 15: rt cpu time (microseconds) unlimited
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment