Skip to content

Instantly share code, notes, and snippets.

@ianmcook
Last active January 30, 2023 21:55
Show Gist options
  • Save ianmcook/d2b197fd52e0ff304624a99ed1c2b149 to your computer and use it in GitHub Desktop.
Save ianmcook/d2b197fd52e0ff304624a99ed1c2b149 to your computer and use it in GitHub Desktop.
Create and execute an Acero ExecPlan
#include <iostream>
#include <arrow/api.h>
#include <arrow/result.h>
#include <arrow/compute/api.h>
#include <arrow/compute/exec/exec_plan.h>
arrow::Status ExecutePlanAndCollectAsTable(
std::shared_ptr<arrow::compute::ExecPlan> plan,
std::shared_ptr<arrow::Schema> schema,
arrow::AsyncGenerator<std::optional<arrow::compute::ExecBatch>> sink_gen) {
// translate sink_gen (async) to sink_reader (sync)
std::shared_ptr<arrow::RecordBatchReader> sink_reader =
arrow::compute::MakeGeneratorReader(schema, std::move(sink_gen), arrow::default_memory_pool());
// validate the ExecPlan
ARROW_RETURN_NOT_OK(plan->Validate());
std::cout << "ExecPlan created : " << plan->ToString() << std::endl;
// start the ExecPlan
plan->StartProducing();
// collect sink_reader into a Table
std::shared_ptr<arrow::Table> response_table;
ARROW_ASSIGN_OR_RAISE(response_table,
arrow::Table::FromRecordBatchReader(sink_reader.get()));
std::cout << "Results : " << std::endl << response_table->ToString() << std::endl;
// stop producing
plan->StopProducing();
// plan mark finished
auto future = plan->finished();
return future.status();
}
arrow::Status Execute() {
//std::cout << arrow::GetBuildInfo().version_string << std::endl;
auto null_long = std::numeric_limits<int>::quiet_NaN();
arrow::Int32Builder int_builder;
ARROW_RETURN_NOT_OK(int_builder.Append(1));
ARROW_RETURN_NOT_OK(int_builder.Append(1));
ARROW_RETURN_NOT_OK(int_builder.AppendNull());
ARROW_RETURN_NOT_OK(int_builder.Append(2));
ARROW_RETURN_NOT_OK(int_builder.Append(3));
ARROW_RETURN_NOT_OK(int_builder.Append(4));
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> int_array, int_builder.Finish());
arrow::StringBuilder str_builder;
ARROW_RETURN_NOT_OK(str_builder.Append("a"));
ARROW_RETURN_NOT_OK(str_builder.Append("a"));
ARROW_RETURN_NOT_OK(str_builder.Append("a"));
ARROW_RETURN_NOT_OK(str_builder.Append("a"));
ARROW_RETURN_NOT_OK(str_builder.Append("b"));
ARROW_RETURN_NOT_OK(str_builder.Append("b"));
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> str_array, str_builder.Finish());
std::vector<std::shared_ptr<arrow::Field>> schema_vector = {
arrow::field("int", arrow::int32()),
arrow::field("str", arrow::utf8())
};
auto schema = std::make_shared<arrow::Schema>(schema_vector);
std::shared_ptr<arrow::Table> table = arrow::Table::Make(schema, {int_array, str_array});
// std::cout << "Data : " << std::endl << table->ToString() << std::endl;
ARROW_ASSIGN_OR_RAISE(
std::shared_ptr<arrow::compute::ExecPlan> plan,
arrow::compute::ExecPlan::Make(*arrow::compute::threaded_exec_context())
);
arrow::AsyncGenerator<std::optional<arrow::compute::ExecBatch>> sink_gen;
int max_batch_size = 100;
auto table_source_options = arrow::compute::TableSourceNodeOptions{table, max_batch_size};
// source node
ARROW_ASSIGN_OR_RAISE(
arrow::compute::ExecNode * source_node,
arrow::compute::MakeExecNode("table_source", plan.get(), {}, table_source_options)
);
// filter node
ARROW_ASSIGN_OR_RAISE(
arrow::compute::ExecNode * filter_node,
arrow::compute::MakeExecNode("filter", plan.get(), {source_node},
arrow::compute::FilterNodeOptions{
arrow::compute::less(arrow::compute::field_ref("int"), arrow::compute::literal(4))
}
)
);
// project node
ARROW_ASSIGN_OR_RAISE(
arrow::compute::ExecNode * project_node,
arrow::compute::MakeExecNode("project", plan.get(), {filter_node},
arrow::compute::ProjectNodeOptions{
{
arrow::compute::field_ref("int"),
arrow::compute::field_ref("str"),
arrow::compute::call(
"round",
{arrow::compute::call(
"add_checked",
{arrow::compute::field_ref("int"), arrow::compute::literal(1.563)}
)},
arrow::compute::RoundOptions(1, arrow::compute::RoundMode::HALF_TO_EVEN)
)
},
{"int","str","dbl"}
}
)
);
//std::cout << "Schema after projection : \n" << project_node->output_schema()->ToString() << std::endl;
// aggregate node
auto options = std::make_shared<arrow::compute::CountOptions>(arrow::compute::CountOptions::ONLY_VALID);
auto aggregate_options =
arrow::compute::AggregateNodeOptions{/*aggregates=*/{{"hash_max", options, "int", "foo"}, {"hash_min", options, "dbl", "bar"}},
/*keys=*/{"str"}};
ARROW_ASSIGN_OR_RAISE(
arrow::compute::ExecNode * aggregate_node,
arrow::compute::MakeExecNode("aggregate", plan.get(), {project_node}, aggregate_options));
// sink node
ARROW_RETURN_NOT_OK(
arrow::compute::MakeExecNode("sink", plan.get(), {aggregate_node}, arrow::compute::SinkNodeOptions{&sink_gen})
);
auto result = ExecutePlanAndCollectAsTable(plan, aggregate_node->output_schema(), sink_gen);
return arrow::Status::OK();
}
int main(int argc, char** argv) {
auto status = Execute();
if (!status.ok()) {
std::cerr << "Error occurred : " << status.message() << std::endl;
return EXIT_FAILURE;
}
return EXIT_SUCCESS;
}
@ianmcook
Copy link
Author

You might need to do do this before compiling:

export CPLUS_INCLUDE_PATH=/usr/local/opt/llvm/include/c++/v1:/Library/Developer/CommandLineTools/SDKs/MacOSX.sdk/usr/include

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment