Skip to content

Instantly share code, notes, and snippets.

@ianmcook
Created January 22, 2024 23:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ianmcook/32393f59c609591f040f5e9b4f315307 to your computer and use it in GitHub Desktop.
Save ianmcook/32393f59c609591f040f5e9b4f315307 to your computer and use it in GitHub Desktop.
Acero Sequence of Declarations for TPC-H Query 06
#include <iostream>
#include <arrow/api.h>
#include <arrow/type.h>
#include <arrow/result.h>
#include <arrow/io/api.h>
#include <arrow/compute/api.h>
#include <arrow/acero/exec_plan.h>
#include <arrow/acero/options.h>
#include <parquet/arrow/reader.h>
arrow::Status ExecutePlanAndCollectAsTable(arrow::acero::Declaration plan) {
// collect sink_reader into a Table
std::shared_ptr<arrow::Table> response_table;
ARROW_ASSIGN_OR_RAISE(response_table, arrow::acero::DeclarationToTable(std::move(plan)));
std::cout << "Results : " << std::endl << response_table->ToString() << std::endl;
return arrow::Status::OK();
}
arrow::Status Execute() {
// read Parquet file
std::shared_ptr<arrow::io::RandomAccessFile> input;
ARROW_ASSIGN_OR_RAISE(input, arrow::io::ReadableFile::Open("lineitem.parquet"));
std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
ARROW_RETURN_NOT_OK(parquet::arrow::OpenFile(input, arrow::default_memory_pool(), &arrow_reader));
std::shared_ptr<arrow::Table> table;
ARROW_RETURN_NOT_OK(arrow_reader->ReadTable(&table));
// table source options
int max_batch_size = 1000;
auto table_source_options = arrow::acero::TableSourceNodeOptions{table, max_batch_size};
// filter options
arrow::compute::Expression filter_expr = arrow::compute::call("and", {
arrow::compute::call("and", {
arrow::compute::call("and", {
arrow::compute::greater_equal(arrow::compute::field_ref("l_shipdate"), arrow::compute::literal(arrow::Date32Scalar(8766))),
// January 1, 1994 is 8766 days after January 1, 1970
arrow::compute::less(arrow::compute::field_ref("l_shipdate"), arrow::compute::literal(arrow::Date32Scalar(9131)))
// January 1, 1995 is 9131 days after January 1, 1970
}),
arrow::compute::call("and", {
arrow::compute::greater_equal(arrow::compute::field_ref("l_discount"), arrow::compute::literal(0.05)),
arrow::compute::less_equal(arrow::compute::field_ref("l_discount"), arrow::compute::literal(0.07))
}),
}),
arrow::compute::less(arrow::compute::field_ref("l_quantity"), arrow::compute::literal(24.0))
});
auto filter_options = arrow::acero::FilterNodeOptions(filter_expr);
// project options
auto project_options = arrow::acero::ProjectNodeOptions{
{
arrow::compute::call(
"multiply",
{arrow::compute::field_ref("l_extendedprice"), arrow::compute::field_ref("l_discount")}
)
},
{"product"}
};
// aggregate options
auto options = std::make_shared<arrow::compute::ScalarAggregateOptions>(/*skip_nulls=*/true, /*min_count=*/1);
auto aggregate_options =
arrow::acero::AggregateNodeOptions{/*aggregates=*/{{"sum", options, "product", "revenue"}}, /*keys=*/{}};
// create plan from sequence of declarations
arrow::acero::Declaration plan =
arrow::acero::Declaration::Sequence({
{"table_source", std::move(table_source_options)},
{"filter", std::move(filter_options)},
{"project", std::move(project_options)},
{"aggregate", std::move(aggregate_options)}
});
// execute plan
auto result = ExecutePlanAndCollectAsTable(std::move(plan));
return arrow::Status::OK();
}
int main(int argc, char** argv) {
auto status = Execute();
if (!status.ok()) {
std::cerr << "Error occurred : " << status.message() << std::endl;
return EXIT_FAILURE;
}
return EXIT_SUCCESS;
}
@ianmcook
Copy link
Author

This is a variation of https://gist.github.com/ianmcook/a51efc2133f5f67aef84e121a2db46ee. It returns the same result.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment