Skip to content

Instantly share code, notes, and snippets.

@amoeba
Last active October 26, 2023 23:35
Show Gist options
  • Save amoeba/2b9703ee49e6465d8b424e67b78770ee to your computer and use it in GitHub Desktop.
Save amoeba/2b9703ee49e6465d8b424e67b78770ee to your computer and use it in GitHub Desktop.
Arrow C++ example filtering a random set of rows from a memory-mapped Arrow IPC file using Acero
#include <iostream>
#include <memory>
#include <random>
#include <string>
#include <vector>
#include "arrow/acero/exec_plan.h"
#include "arrow/acero/options.h"
#include "arrow/acero/util.h"
#include "arrow/array/data.h"
#include "arrow/builder.h"
#include "arrow/compute/api_scalar.h"
#include "arrow/compute/expression.h"
#include "arrow/datum.h"
#include "arrow/io/file.h"
#include "arrow/ipc/api.h"
#include "arrow/ipc/reader.h"
#include "arrow/record_batch.h"
#include "arrow/result.h"
#include "arrow/status.h"
#include "arrow/table.h"
std::random_device os_seed;
const uint_least64_t seed = os_seed();
std::shared_ptr<const std::vector<int64_t>>
GenerateRandomInt64(const int64_t start, const int64_t end,
const int64_t size) {
std::mt19937_64 generator(seed);
std::uniform_int_distribution<int_least64_t> distribute(start, end);
auto values = std::make_shared<std::vector<int64_t>>();
values->reserve(size);
for (int64_t i = 0; i < size; i++) {
values->push_back(distribute(generator));
}
return values;
}
arrow::Result<std::shared_ptr<arrow::Array>>
GenerateRandomArray(const int64_t start, const int64_t end,
const int64_t size) {
auto values = GenerateRandomInt64(start, end, size);
arrow::Int64Builder builder;
auto append_result = builder.AppendValues(values->begin(), values->end());
if (!append_result.ok()) {
return append_result;
}
auto array_result = builder.Finish();
if (!array_result.ok()) {
return array_result.status();
}
return array_result;
}
arrow::Result<std::shared_ptr<arrow::ipc::RecordBatchFileReader>>
OpenIPCFileMMapped(const std::string path) {
auto open_result =
arrow::io::MemoryMappedFile::Open(path, arrow::io::FileMode::type::READ);
if (!open_result.ok()) {
return open_result.status();
}
auto input = open_result.ValueOrDie();
auto read_options = arrow::ipc::IpcReadOptions::Defaults();
auto reader = arrow::ipc::RecordBatchFileReader::Open(input, read_options);
return reader;
}
arrow::Result<std::shared_ptr<arrow::RecordBatchReader>>
FileToRecordBatchReader(
const std::shared_ptr<arrow::ipc::RecordBatchFileReader> file_reader) {
auto table_result = file_reader->ToTable();
if (!table_result.ok()) {
return table_result.status();
}
auto tbl = table_result.ValueOrDie();
std::shared_ptr<arrow::RecordBatchReader> reader =
std::make_shared<arrow::TableBatchReader>(tbl);
return reader;
}
arrow::Status RunMain(const std::string path, const std::string col_name,
const int sample_size) {
// Step 1: Open the IPC file as a MemoryMapped file
auto file_reader_result = OpenIPCFileMMapped(path);
if (!file_reader_result.ok()) {
return file_reader_result.status();
}
auto file_reader = file_reader_result.ValueOrDie();
// Step 2: Convert to a RecordBatchReader so Acero can consume it
auto reader_result = FileToRecordBatchReader(file_reader);
if (!reader_result.ok()) {
return file_reader_result.status();
}
auto reader = reader_result.ValueOrDie();
// Step 3: Create our random row indices to filter with
auto count_rows_result = file_reader->CountRows();
if (!count_rows_result.ok()) {
return count_rows_result.status();
}
auto num_rows = count_rows_result.ValueOrDie();
auto gen_result = GenerateRandomArray(1, num_rows, sample_size);
if (!gen_result.ok()) {
return gen_result.status();
}
auto values = gen_result.ValueOrDie();
// Step 4: Create an execute an ExecPlan that filters the RecordBatches
arrow::acero::Declaration decl = arrow::acero::Declaration::Sequence({
{"record_batch_reader_source",
arrow::acero::RecordBatchReaderSourceNodeOptions({reader})},
{"filter", arrow::acero::FilterNodeOptions{call(
"is_in", {arrow::compute::field_ref(col_name)},
arrow::compute::SetLookupOptions{values})}},
});
auto result = arrow::acero::DeclarationToExecBatches(decl, 1);
auto tbl_result =
arrow::acero::TableFromExecBatches(reader->schema(), result->batches);
if (!tbl_result.ok()) {
return tbl_result.status();
}
auto tbl = tbl_result.ValueOrDie();
std::cout << "Successfully filtered to " << tbl->num_rows() << " rows."
<< std::endl;
return arrow::Status::OK();
}
int main(int argc, char **argv) {
if (argc < 3) {
std::cout << "Usage: ./example path_to_ipc_file.arrow 1234" << std::endl;
return -1;
}
auto path = argv[1];
auto sample_size = std::stoi(argv[2]);
std::string col_name = "i"; // Hardcoded for sake of the example
std::cout << "Filtering " << path << " to ~" << sample_size
<< " random rows..." << std::endl;
auto result = RunMain(path, col_name, sample_size);
if (!result.ok()) {
std::cout << "RunMain() failed with error message: " << result.message()
<< std::endl;
return -1;
}
return 0;
}
@amoeba
Copy link
Author

amoeba commented Oct 26, 2023

Calling ValueOrDie and then returning an OK error status seems odd. Either remove the error return or handle the error.

Thanks for catching that @EpsilonPrime, that seems silly to me now.

@amoeba
Copy link
Author

amoeba commented Oct 26, 2023

I prefer to check values in sample code (thinking of ValuesUnsafe) and show proper error handling. A lot of sample code skips that for readability. Depends on your purpose.

I like how that sounds, what does "show proper error handling" look like to you? I'm trying to follow https://arrow.apache.org/docs/cpp/api/support.html#_CPPv4I0EN5arrow6ResultE but instead of printing the status, I'm trying to propagate the status out of RunMain.

@amoeba
Copy link
Author

amoeba commented Oct 26, 2023

I edited the gist to use ValueOrDie like our docs suggest but I'm actually going to refactor all of the status/result stuff to use ARROW_ASSIGN_OR_RAISE.

@EpsilonPrime
Copy link

ARROW_ASSIGN_OR_RAISE seems appropriate. I wish it said RETURN instead of RAISE since it doesn't actually raise an exception though. ;)

@amoeba
Copy link
Author

amoeba commented Oct 26, 2023

Yeah, the name is confusing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment