Skip to content

Instantly share code, notes, and snippets.

@ianmcook
Last active January 22, 2024 23:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ianmcook/c96b80f943406dc791f0e9455e11ac11 to your computer and use it in GitHub Desktop.
Save ianmcook/c96b80f943406dc791f0e9455e11ac11 to your computer and use it in GitHub Desktop.
Acero ExecPlan for TPC-H Query 06
#include <iostream>
#include <arrow/api.h>
#include <arrow/type.h>
#include <arrow/result.h>
#include <arrow/io/api.h>
#include <arrow/compute/api.h>
#include <arrow/acero/exec_plan.h>
#include <arrow/acero/options.h>
#include <parquet/arrow/reader.h>
arrow::Status ExecutePlanAndCollectAsTable(
std::shared_ptr<arrow::acero::ExecPlan> plan,
std::shared_ptr<arrow::Schema> schema,
arrow::AsyncGenerator<std::optional<arrow::compute::ExecBatch>> sink_gen) {
// translate sink_gen (async) to sink_reader (sync)
std::shared_ptr<arrow::RecordBatchReader> sink_reader =
arrow::acero::MakeGeneratorReader(schema, std::move(sink_gen), arrow::default_memory_pool());
// validate the ExecPlan
ARROW_RETURN_NOT_OK(plan->Validate());
std::cout << "ExecPlan created : " << plan->ToString() << std::endl;
// start the ExecPlan
plan->StartProducing();
// collect sink_reader into a Table
std::shared_ptr<arrow::Table> response_table;
ARROW_ASSIGN_OR_RAISE(response_table, arrow::Table::FromRecordBatchReader(sink_reader.get()));
std::cout << "Results : " << std::endl << response_table->ToString() << std::endl;
// stop producing
plan->StopProducing();
// plan mark finished
auto future = plan->finished();
return future.status();
}
arrow::Status Execute() {
// read Parquet file
std::shared_ptr<arrow::io::RandomAccessFile> input;
ARROW_ASSIGN_OR_RAISE(input, arrow::io::ReadableFile::Open("lineitem.parquet"));
std::unique_ptr<parquet::arrow::FileReader> arrow_reader;
ARROW_RETURN_NOT_OK(parquet::arrow::OpenFile(input, arrow::default_memory_pool(), &arrow_reader));
std::shared_ptr<arrow::Table> table;
ARROW_RETURN_NOT_OK(arrow_reader->ReadTable(&table));
// make ExecPlan
ARROW_ASSIGN_OR_RAISE(
std::shared_ptr<arrow::acero::ExecPlan> plan,
arrow::acero::ExecPlan::Make(*arrow::compute::threaded_exec_context())
);
// make source node
arrow::AsyncGenerator<std::optional<arrow::compute::ExecBatch>> sink_gen;
int max_batch_size = 1000;
auto table_source_options = arrow::acero::TableSourceNodeOptions{table, max_batch_size};
ARROW_ASSIGN_OR_RAISE(
arrow::acero::ExecNode * source_node,
arrow::acero::MakeExecNode("table_source", plan.get(), {}, table_source_options)
);
// make filter node
arrow::compute::Expression filter_expr = arrow::compute::call("and", {
arrow::compute::call("and", {
arrow::compute::call("and", {
arrow::compute::greater_equal(arrow::compute::field_ref("l_shipdate"), arrow::compute::literal(arrow::Date32Scalar(8766))),
// January 1, 1994 is 8766 days after January 1, 1970
arrow::compute::less(arrow::compute::field_ref("l_shipdate"), arrow::compute::literal(arrow::Date32Scalar(9131)))
// January 1, 1995 is 9131 days after January 1, 1970
}),
arrow::compute::call("and", {
arrow::compute::greater_equal(arrow::compute::field_ref("l_discount"), arrow::compute::literal(0.05)),
arrow::compute::less_equal(arrow::compute::field_ref("l_discount"), arrow::compute::literal(0.07))
}),
}),
arrow::compute::less(arrow::compute::field_ref("l_quantity"), arrow::compute::literal(24.0))
});
ARROW_ASSIGN_OR_RAISE(
arrow::acero::ExecNode * filter_node,
arrow::acero::MakeExecNode("filter", plan.get(), {source_node},
arrow::acero::FilterNodeOptions{filter_expr}
)
);
// make project node
ARROW_ASSIGN_OR_RAISE(
arrow::acero::ExecNode * project_node,
arrow::acero::MakeExecNode("project", plan.get(), {filter_node},
arrow::acero::ProjectNodeOptions{
{
arrow::compute::call(
"multiply",
{arrow::compute::field_ref("l_extendedprice"), arrow::compute::field_ref("l_discount")}
)
},
{"product"}
}
)
);
// make aggregate node
auto options = std::make_shared<arrow::compute::ScalarAggregateOptions>(/*skip_nulls=*/true, /*min_count=*/1);
auto aggregate_options =
arrow::acero::AggregateNodeOptions{/*aggregates=*/{{"sum", options, "product", "revenue"}}, /*keys=*/{}};
ARROW_ASSIGN_OR_RAISE(
arrow::acero::ExecNode * aggregate_node,
arrow::acero::MakeExecNode("aggregate", plan.get(), {project_node}, aggregate_options)
);
// make sink node
ARROW_RETURN_NOT_OK(
arrow::acero::MakeExecNode("sink", plan.get(), {aggregate_node}, arrow::acero::SinkNodeOptions{&sink_gen})
);
// execute plan
auto result = ExecutePlanAndCollectAsTable(plan, aggregate_node->output_schema(), sink_gen);
return arrow::Status::OK();
}
int main(int argc, char** argv) {
auto status = Execute();
if (!status.ok()) {
std::cerr << "Error occurred : " << status.message() << std::endl;
return EXIT_FAILURE;
}
return EXIT_SUCCESS;
}
@ianmcook
Copy link
Author

Compile with:

clang++ acero_tpch_06.cpp -std=c++17 -I/usr/local/include -L/usr/local/lib -larrow -lparquet -larrow_acero -o acero_tpch_06

@ianmcook
Copy link
Author

Use the instructions here to create lineitem.parquet: https://github.com/ljishen/tpch-data

@ianmcook
Copy link
Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment