Created
January 22, 2024 23:24
-
-
Save ianmcook/32393f59c609591f040f5e9b4f315307 to your computer and use it in GitHub Desktop.
Acero Sequence of Declarations for TPC-H Query 06
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#include <iostream> | |
#include <arrow/api.h> | |
#include <arrow/type.h> | |
#include <arrow/result.h> | |
#include <arrow/io/api.h> | |
#include <arrow/compute/api.h> | |
#include <arrow/acero/exec_plan.h> | |
#include <arrow/acero/options.h> | |
#include <parquet/arrow/reader.h> | |
arrow::Status ExecutePlanAndCollectAsTable(arrow::acero::Declaration plan) { | |
// collect sink_reader into a Table | |
std::shared_ptr<arrow::Table> response_table; | |
ARROW_ASSIGN_OR_RAISE(response_table, arrow::acero::DeclarationToTable(std::move(plan))); | |
std::cout << "Results : " << std::endl << response_table->ToString() << std::endl; | |
return arrow::Status::OK(); | |
} | |
arrow::Status Execute() { | |
// read Parquet file | |
std::shared_ptr<arrow::io::RandomAccessFile> input; | |
ARROW_ASSIGN_OR_RAISE(input, arrow::io::ReadableFile::Open("lineitem.parquet")); | |
std::unique_ptr<parquet::arrow::FileReader> arrow_reader; | |
ARROW_RETURN_NOT_OK(parquet::arrow::OpenFile(input, arrow::default_memory_pool(), &arrow_reader)); | |
std::shared_ptr<arrow::Table> table; | |
ARROW_RETURN_NOT_OK(arrow_reader->ReadTable(&table)); | |
// table source options | |
int max_batch_size = 1000; | |
auto table_source_options = arrow::acero::TableSourceNodeOptions{table, max_batch_size}; | |
// filter options | |
arrow::compute::Expression filter_expr = arrow::compute::call("and", { | |
arrow::compute::call("and", { | |
arrow::compute::call("and", { | |
arrow::compute::greater_equal(arrow::compute::field_ref("l_shipdate"), arrow::compute::literal(arrow::Date32Scalar(8766))), | |
// January 1, 1994 is 8766 days after January 1, 1970 | |
arrow::compute::less(arrow::compute::field_ref("l_shipdate"), arrow::compute::literal(arrow::Date32Scalar(9131))) | |
// January 1, 1995 is 9131 days after January 1, 1970 | |
}), | |
arrow::compute::call("and", { | |
arrow::compute::greater_equal(arrow::compute::field_ref("l_discount"), arrow::compute::literal(0.05)), | |
arrow::compute::less_equal(arrow::compute::field_ref("l_discount"), arrow::compute::literal(0.07)) | |
}), | |
}), | |
arrow::compute::less(arrow::compute::field_ref("l_quantity"), arrow::compute::literal(24.0)) | |
}); | |
auto filter_options = arrow::acero::FilterNodeOptions(filter_expr); | |
// project options | |
auto project_options = arrow::acero::ProjectNodeOptions{ | |
{ | |
arrow::compute::call( | |
"multiply", | |
{arrow::compute::field_ref("l_extendedprice"), arrow::compute::field_ref("l_discount")} | |
) | |
}, | |
{"product"} | |
}; | |
// aggregate options | |
auto options = std::make_shared<arrow::compute::ScalarAggregateOptions>(/*skip_nulls=*/true, /*min_count=*/1); | |
auto aggregate_options = | |
arrow::acero::AggregateNodeOptions{/*aggregates=*/{{"sum", options, "product", "revenue"}}, /*keys=*/{}}; | |
// create plan from sequence of declarations | |
arrow::acero::Declaration plan = | |
arrow::acero::Declaration::Sequence({ | |
{"table_source", std::move(table_source_options)}, | |
{"filter", std::move(filter_options)}, | |
{"project", std::move(project_options)}, | |
{"aggregate", std::move(aggregate_options)} | |
}); | |
// execute plan | |
auto result = ExecutePlanAndCollectAsTable(std::move(plan)); | |
return arrow::Status::OK(); | |
} | |
int main(int argc, char** argv) { | |
auto status = Execute(); | |
if (!status.ok()) { | |
std::cerr << "Error occurred : " << status.message() << std::endl; | |
return EXIT_FAILURE; | |
} | |
return EXIT_SUCCESS; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This is a variation of https://gist.github.com/ianmcook/a51efc2133f5f67aef84e121a2db46ee. It returns the same result.