Created
September 6, 2023 20:34
-
-
Save icexelloss/88195de046962e1d043c99d96e1b8b43 to your computer and use it in GitHub Desktop.
arrow_parquet_memory_repro
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Status exp_read_s3_scan_raw(const std::string& fs, | |
const std::string& bucket, | |
const std::vector<std::string>& filenames, | |
const int dup_factor) { | |
// Read a S3 table directly through scan node | |
std::cout << "exp_read_s3_scan_raw()" << std::endl; | |
std::vector<std::string> dup_filenames = filenames; | |
for (int i = 0; i < dup_factor; ++i) { | |
dup_filenames.insert(dup_filenames.end(), filenames.begin(), filenames.end()); | |
} | |
// For local filesystem reading, file:/// may work here as well. You may | |
// need to #include <arrow/dataset/file_base.h> | |
// const std::string uri_string = "ts3://"; | |
const std::string uri_string = fs; | |
// const std::string uri_string = "gs://"; | |
// Filesystem and path | |
ARROW_ASSIGN_OR_RAISE(const std::shared_ptr<arrow::fs::FileSystem> filesystem, | |
arrow::fs::FileSystemFromUri(uri_string, nullptr)); | |
// Scan node expects paths (bucket name and file name all in one string) | |
std::vector<std::string> paths; | |
for (auto i = 0; i < dup_filenames.size(); i++) { | |
paths.emplace_back(bucket + "/" + dup_filenames[i]); | |
} | |
ARROW_ASSIGN_OR_RAISE( | |
auto factory, arrow::dataset::FileSystemDatasetFactory::Make( | |
filesystem, paths, | |
std::make_shared<arrow::dataset::ParquetFileFormat>(), | |
arrow::dataset::FileSystemFactoryOptions())); | |
ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish()); | |
// Register "scan" node | |
arrow::dataset::internal::Initialize(); | |
ARROW_ASSIGN_OR_RAISE(auto plan, ExecPlan::Make()); | |
std::cout << "Current IO Thread Pool capacity: " | |
<< arrow::io::GetIOThreadPoolCapacity() << std::endl; | |
// arrow::io::SetIOThreadPoolCapacity(32); | |
auto fragment_scan_options = | |
std::make_shared<arrow::dataset::ParquetFragmentScanOptions>(); | |
fragment_scan_options->arrow_reader_properties->set_pre_buffer(true); | |
// fragment_scan_options->arrow_reader_properties->set_cache_hole_size_limit(8 | |
// * 1024); | |
// fragment_scan_options->arrow_reader_properties->set_cache_range_size_limit(32 | |
// * 1024 * 1024); | |
auto scan_options = std::make_shared<arrow::dataset::ScanOptions>(); | |
// Add a projection. This will null out the non-selected columns, but they | |
// will still exist in the schema. A projection will be added below. | |
scan_options->fragment_scan_options = fragment_scan_options; | |
std::vector<std::string> proj_names = | |
bamboo::main_constants::MDP_US_MODEL_STATE_100_COLS; | |
std::vector<cp::Expression> proj_exprs; | |
for (auto i = 0; i < proj_names.size(); i++) { | |
proj_exprs.push_back(cp::field_ref(proj_names[i])); | |
} | |
scan_options->projection = cp::project(proj_exprs, proj_names); | |
auto scan_node_options = arrow::dataset::ScanNodeOptions{ | |
dataset, scan_options, /*require_sequenced_output=*/true}; | |
// scan_node_options.scan_options->batch_size = 256 * 1024; | |
// scan_node_options.scan_options->batch_readahead = 4; | |
// scan_node_options.scan_options->fragment_readahead = 16; | |
Declaration scan{"scan", std::move(scan_node_options)}; | |
// Add a projection for fairness in comparison to exp_parquetjni, | |
// because the projection built into scan will only null out | |
// non-selected columns instead of compeltely removing them | |
// (while ParquetJni does remove them). | |
Declaration project{"project", | |
{std::move(scan)}, | |
std::move(ac::ProjectNodeOptions(proj_exprs, proj_names)), | |
"project"}; | |
ARROW_ASSIGN_OR_RAISE(auto reader, | |
ac::DeclarationToReader(std::move(project), false)); | |
int64_t total_num_rows = 0; | |
int64_t total_num_columns = 0; | |
std::shared_ptr<RecordBatch> batch; | |
while (true) { | |
ARROW_RETURN_NOT_OK(reader->ReadNext(&batch)); | |
if (batch == nullptr) { | |
break; | |
} else { | |
// std::cout << "Received a batch. Size: " << batch->num_rows() << std::endl; | |
total_num_rows += batch->num_rows(); | |
if (total_num_columns == 0) { | |
total_num_columns = batch->num_columns(); | |
} | |
// std::cout << batch->ToString() << std::endl; | |
} | |
} | |
std::cout << "Total number of rows: " << total_num_rows << std::endl; | |
std::cout << "Total number of columns: " << total_num_columns << std::endl; | |
ARROW_RETURN_NOT_OK(reader->Close()); | |
return Status::OK(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment