Skip to content

Instantly share code, notes, and snippets.

@ianmcook
Created January 22, 2024 23:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ianmcook/a51efc2133f5f67aef84e121a2db46ee to your computer and use it in GitHub Desktop.
Save ianmcook/a51efc2133f5f67aef84e121a2db46ee to your computer and use it in GitHub Desktop.
Acero Declarations for TPC-H Query 06
#include <iostream>
#include <arrow/api.h>
#include <arrow/type.h>
#include <arrow/result.h>
#include <arrow/io/api.h>
#include <arrow/compute/api.h>
#include <arrow/acero/exec_plan.h>
#include <arrow/acero/options.h>
#include <parquet/arrow/reader.h>
arrow::Status ExecutePlanAndCollectAsTable(arrow::acero::Declaration plan) {
// collect sink_reader into a Table
std::shared_ptr<arrow::Table> response_table;
ARROW_ASSIGN_OR_RAISE(response_table, arrow::acero::DeclarationToTable(std::move(plan)));
std::cout << "Results : " << std::endl << response_table->ToString() << std::endl;
return arrow::Status::OK();
}
arrow::Status Execute() {
// read Parquet file
std::shared_ptr<arrow::io::RandomAccessFile> input;
ARROW_ASSIGN_OR_RAISE(input, arrow::io::ReadableFile::Open("lineitem.parquet"));
std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
ARROW_RETURN_NOT_OK(parquet::arrow::OpenFile(input, arrow::default_memory_pool(), &arrow_reader));
std::shared_ptr<arrow::Table> table;
ARROW_RETURN_NOT_OK(arrow_reader->ReadTable(&table));
// declare table source
int max_batch_size = 1000;
auto table_source_options = arrow::acero::TableSourceNodeOptions{table, max_batch_size};
arrow::acero::Declaration source{
"table_source",
std::move(table_source_options)
};
// declare filter operation
arrow::compute::Expression filter_expr = arrow::compute::call("and", {
arrow::compute::call("and", {
arrow::compute::call("and", {
arrow::compute::greater_equal(arrow::compute::field_ref("l_shipdate"), arrow::compute::literal(arrow::Date32Scalar(8766))),
// January 1, 1994 is 8766 days after January 1, 1970
arrow::compute::less(arrow::compute::field_ref("l_shipdate"), arrow::compute::literal(arrow::Date32Scalar(9131)))
// January 1, 1995 is 9131 days after January 1, 1970
}),
arrow::compute::call("and", {
arrow::compute::greater_equal(arrow::compute::field_ref("l_discount"), arrow::compute::literal(0.05)),
arrow::compute::less_equal(arrow::compute::field_ref("l_discount"), arrow::compute::literal(0.07))
}),
}),
arrow::compute::less(arrow::compute::field_ref("l_quantity"), arrow::compute::literal(24.0))
});
auto filter_options = arrow::acero::FilterNodeOptions(filter_expr);
arrow::acero::Declaration filter{
"filter",
{std::move(source)},
std::move(filter_options)
};
// declare project operation
auto project_options = arrow::acero::ProjectNodeOptions{
{
arrow::compute::call(
"multiply",
{arrow::compute::field_ref("l_extendedprice"), arrow::compute::field_ref("l_discount")}
)
},
{"product"}
};
arrow::acero::Declaration project{
"project",
{std::move(filter)},
std::move(project_options)
};
// declare aggregate operation
auto options = std::make_shared<arrow::compute::ScalarAggregateOptions>(/*skip_nulls=*/true, /*min_count=*/1);
auto aggregate_options =
arrow::acero::AggregateNodeOptions{/*aggregates=*/{{"sum", options, "product", "revenue"}}, /*keys=*/{}};
arrow::acero::Declaration aggregate{
"aggregate",
{std::move(project)},
std::move(aggregate_options)
};
// execute plan
auto result = ExecutePlanAndCollectAsTable(std::move(aggregate));
return arrow::Status::OK();
}
int main(int argc, char** argv) {
auto status = Execute();
if (!status.ok()) {
std::cerr << "Error occurred : " << status.message() << std::endl;
return EXIT_FAILURE;
}
return EXIT_SUCCESS;
}
@ianmcook
Copy link
Author

This is a variation of https://gist.github.com/ianmcook/c96b80f943406dc791f0e9455e11ac11. It returns the same result.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment