Skip to content

Instantly share code, notes, and snippets.

@icexelloss
Created September 6, 2023 20:34
Show Gist options
  • Save icexelloss/88195de046962e1d043c99d96e1b8b43 to your computer and use it in GitHub Desktop.
Save icexelloss/88195de046962e1d043c99d96e1b8b43 to your computer and use it in GitHub Desktop.
arrow_parquet_memory_repro
Status exp_read_s3_scan_raw(const std::string& fs,
const std::string& bucket,
const std::vector<std::string>& filenames,
const int dup_factor) {
// Read a S3 table directly through scan node
std::cout << "exp_read_s3_scan_raw()" << std::endl;
std::vector<std::string> dup_filenames = filenames;
for (int i = 0; i < dup_factor; ++i) {
dup_filenames.insert(dup_filenames.end(), filenames.begin(), filenames.end());
}
// For local filesystem reading, file:/// may work here as well. You may
// need to #include <arrow/dataset/file_base.h>
// const std::string uri_string = "ts3://";
const std::string uri_string = fs;
// const std::string uri_string = "gs://";
// Filesystem and path
ARROW_ASSIGN_OR_RAISE(const std::shared_ptr<arrow::fs::FileSystem> filesystem,
arrow::fs::FileSystemFromUri(uri_string, nullptr));
// Scan node expects paths (bucket name and file name all in one string)
std::vector<std::string> paths;
for (auto i = 0; i < dup_filenames.size(); i++) {
paths.emplace_back(bucket + "/" + dup_filenames[i]);
}
ARROW_ASSIGN_OR_RAISE(
auto factory, arrow::dataset::FileSystemDatasetFactory::Make(
filesystem, paths,
std::make_shared<arrow::dataset::ParquetFileFormat>(),
arrow::dataset::FileSystemFactoryOptions()));
ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
// Register "scan" node
arrow::dataset::internal::Initialize();
ARROW_ASSIGN_OR_RAISE(auto plan, ExecPlan::Make());
std::cout << "Current IO Thread Pool capacity: "
<< arrow::io::GetIOThreadPoolCapacity() << std::endl;
// arrow::io::SetIOThreadPoolCapacity(32);
auto fragment_scan_options =
std::make_shared<arrow::dataset::ParquetFragmentScanOptions>();
fragment_scan_options->arrow_reader_properties->set_pre_buffer(true);
// fragment_scan_options->arrow_reader_properties->set_cache_hole_size_limit(8
// * 1024);
// fragment_scan_options->arrow_reader_properties->set_cache_range_size_limit(32
// * 1024 * 1024);
auto scan_options = std::make_shared<arrow::dataset::ScanOptions>();
// Add a projection. This will null out the non-selected columns, but they
// will still exist in the schema. A projection will be added below.
scan_options->fragment_scan_options = fragment_scan_options;
std::vector<std::string> proj_names =
bamboo::main_constants::MDP_US_MODEL_STATE_100_COLS;
std::vector<cp::Expression> proj_exprs;
for (auto i = 0; i < proj_names.size(); i++) {
proj_exprs.push_back(cp::field_ref(proj_names[i]));
}
scan_options->projection = cp::project(proj_exprs, proj_names);
auto scan_node_options = arrow::dataset::ScanNodeOptions{
dataset, scan_options, /*require_sequenced_output=*/true};
// scan_node_options.scan_options->batch_size = 256 * 1024;
// scan_node_options.scan_options->batch_readahead = 4;
// scan_node_options.scan_options->fragment_readahead = 16;
Declaration scan{"scan", std::move(scan_node_options)};
// Add a projection for fairness in comparison to exp_parquetjni,
// because the projection built into scan will only null out
// non-selected columns instead of compeltely removing them
// (while ParquetJni does remove them).
Declaration project{"project",
{std::move(scan)},
std::move(ac::ProjectNodeOptions(proj_exprs, proj_names)),
"project"};
ARROW_ASSIGN_OR_RAISE(auto reader,
ac::DeclarationToReader(std::move(project), false));
int64_t total_num_rows = 0;
int64_t total_num_columns = 0;
std::shared_ptr<RecordBatch> batch;
while (true) {
ARROW_RETURN_NOT_OK(reader->ReadNext(&batch));
if (batch == nullptr) {
break;
} else {
// std::cout << "Received a batch. Size: " << batch->num_rows() << std::endl;
total_num_rows += batch->num_rows();
if (total_num_columns == 0) {
total_num_columns = batch->num_columns();
}
// std::cout << batch->ToString() << std::endl;
}
}
std::cout << "Total number of rows: " << total_num_rows << std::endl;
std::cout << "Total number of columns: " << total_num_columns << std::endl;
ARROW_RETURN_NOT_OK(reader->Close());
return Status::OK();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment