Skip to content

Instantly share code, notes, and snippets.

@samatjain
Created March 12, 2021 23:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save samatjain/05646eaf9b133f096f08307abdfb6c34 to your computer and use it in GitHub Desktop.
Save samatjain/05646eaf9b133f096f08307abdfb6c34 to your computer and use it in GitHub Desktop.
du ioring comparison
#include <iostream>
#include <fstream>
#include <filesystem>
#include <chrono>
using namespace std;
using Timer = std::chrono::high_resolution_clock;
using TimePoint = std::chrono::time_point<Timer>;
using ms = std::chrono::milliseconds;
using ns = std::chrono::nanoseconds;
namespace fs = std::filesystem;
int main(int argc, char* argv[]) {
fs::path cwd = fs::current_path();
TimePoint start, end;
unsigned long total_size_bytes = 0;
start = Timer::now();
for (auto& de: fs::recursive_directory_iterator(cwd)) {
const fs::path& p = de.path();
if (de.is_regular_file()) {
total_size_bytes += fs::file_size(p);
}
}
end = Timer::now();
unsigned duration_ms = std::chrono::duration_cast<ms>(end - start).count();
cout << argv[0] <<": path=" << cwd << " total_size=" << total_size_bytes << " duration_ms=" << duration_ms << endl;
}
#include <iostream>
#include <fstream>
#include <filesystem>
#include <future>
#include <vector>
#include <list>
#include <map>
#include <chrono>
#include <liburing.h>
#include <sys/stat.h>
using namespace std;
using Timer = std::chrono::high_resolution_clock;
using TimePoint = std::chrono::time_point<Timer>;
using ms = std::chrono::milliseconds;
using ns = std::chrono::nanoseconds;
namespace fs = std::filesystem;
class IoManager {
public:
IoManager() {
int ret;
ret = io_uring_queue_init(IO_URING_QUEUE_MAX_SIZE, &_ring, 0);
if (ret) {
cerr << "ring setup failed" << endl;
exit(1);
}
}
~IoManager() {
io_uring_queue_exit(&_ring);
}
/**
* @return How many items were drained
*/
size_t DrainIoRing(int expected) {
//cout << "Calling DrainIoRing" << endl;
int drained_items = 0;
// While there are events for us
for (int i = 0; i < expected; ++i) {
struct io_uring_cqe *cqe; // completion queue entry
int ret = io_uring_wait_cqe(&_ring, &cqe);
if (ret < 0) {
cerr << "waiting for completion failed: " << ret << endl;
exit(1);
}
// Retrieve pointer into _in_flights
fs::path* path_key_completed = (fs::path*)(cqe->user_data);
// Find in in_flights
auto it = _in_flights.find(*path_key_completed);
unsigned long file_size = it->second.stx_size;
_total_size += file_size;
//cout << "CQE received for path=" << *path_key_completed << " size=" << file_size << endl;
// Mark as seen so kernel can re-use slot in ring
io_uring_cqe_seen(&_ring, cqe);
// Clear in flights (commented out for perf testing)
//_in_flights.erase(it);
drained_items++;
}
return drained_items;
}
size_t AsyncGetFileSizes(const vector<fs::path>& paths) {
struct statx x1;
const auto total_items = paths.size();
int inserted_items = 0;
int completed_items = 0;
// Compute and send requests
for (const auto& path: paths) {
//cout << "io_uring_sq_space_left=" << io_uring_sq_space_left(&_ring) << endl;
if (inserted_items == IO_URING_QUEUE_MAX_SIZE) {
inserted_items = 0;
/*int ret = io_uring_submit(&_ring);
if (ret <= 0) {
cerr << "seq submit failed: " << ret << endl;
exit(1);
}*/
while (io_uring_submit(&_ring) > 0) {
//cerr << "unable to submit, need to sleep!" << endl;
std::this_thread::sleep_for(std::chrono::milliseconds(1));
};
completed_items += DrainIoRing(IO_URING_QUEUE_MAX_SIZE);
}
// ASSUMPTION: this list doesn't get reallocated and pointers stay
// valid!
auto [it, emplace_success] = \
_in_flights.emplace(path, x1);
// Used to identify requests in SQE/CQE
const fs::path* path_key = &(it->first);
struct statx* path_statx = &(it->second);
//cout << "Enqueing path=" << *path_key << endl;
struct io_uring_sqe *sqe; // submission queue entry;
sqe = io_uring_get_sqe(&_ring);
if (!sqe) {
cerr << "get sqe failed" << endl;
exit(1);
}
//cout << "Processing path=" << path << endl;
io_uring_prep_statx(sqe, -1, path.c_str(), 0, STATX_ALL, path_statx);
io_uring_sqe_set_data(sqe, const_cast<fs::path*>(path_key));
inserted_items++;
}
/*cout << "End of processing total_items=" << total_items
<< " paths, remaining_items=" << (total_items - completed_items)
<< " so far, total_size=" << _total_size << endl;*/
/*cout << "Remaining items for which we await CQEs:";
for (const auto& [path, statx]: _in_flights)
cout << " " << path << endl;*/
while (io_uring_submit(&_ring) > 0) {
//cerr << "unable to submit, need to sleep!" << endl;
std::this_thread::sleep_for(std::chrono::milliseconds(1));
};
if (completed_items != total_items)
completed_items += DrainIoRing(total_items - completed_items);
return _total_size;
}
private:
const unsigned IO_URING_QUEUE_MAX_SIZE = 64;
struct io_uring _ring;
std::map<fs::path, struct statx> _in_flights;
unsigned long _total_size = 0;
};
int main(int argc, char* argv[]) {
fs::path cwd = fs::current_path();
TimePoint start, end;
IoManager iom;
unsigned long total_size_bytes = 0;
start = Timer::now();
int count = 0;
std::vector<fs::path> paths;
for (auto& de: fs::recursive_directory_iterator(cwd)) {
const fs::path& p = de.path();
if (de.is_regular_file()) {
paths.emplace_back(p);
}
}
total_size_bytes += iom.AsyncGetFileSizes(paths);
end = Timer::now();
unsigned duration_ms = std::chrono::duration_cast<ms>(end - start).count();
cout << argv[0] << ": path=" << cwd << " total_size=" << total_size_bytes << " duration_ms=" << duration_ms << endl;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment