Skip to content

Instantly share code, notes, and snippets.

@amoeba
Last active October 26, 2023 23:35
Show Gist options
  • Save amoeba/2b9703ee49e6465d8b424e67b78770ee to your computer and use it in GitHub Desktop.
Save amoeba/2b9703ee49e6465d8b424e67b78770ee to your computer and use it in GitHub Desktop.
Arrow C++ example filtering a random set of rows from a memory-mapped Arrow IPC file using Acero
#include <iostream>
#include <memory>
#include <random>
#include <string>
#include <vector>
#include "arrow/acero/exec_plan.h"
#include "arrow/acero/options.h"
#include "arrow/acero/util.h"
#include "arrow/array/data.h"
#include "arrow/builder.h"
#include "arrow/compute/api_scalar.h"
#include "arrow/compute/expression.h"
#include "arrow/datum.h"
#include "arrow/io/file.h"
#include "arrow/ipc/api.h"
#include "arrow/ipc/reader.h"
#include "arrow/record_batch.h"
#include "arrow/result.h"
#include "arrow/status.h"
#include "arrow/table.h"
std::random_device os_seed;
const uint_least64_t seed = os_seed();
std::shared_ptr<const std::vector<int64_t>>
GenerateRandomInt64(const int64_t start, const int64_t end,
const int64_t size) {
std::mt19937_64 generator(seed);
std::uniform_int_distribution<int_least64_t> distribute(start, end);
auto values = std::make_shared<std::vector<int64_t>>();
values->reserve(size);
for (int64_t i = 0; i < size; i++) {
values->push_back(distribute(generator));
}
return values;
}
arrow::Result<std::shared_ptr<arrow::Array>>
GenerateRandomArray(const int64_t start, const int64_t end,
const int64_t size) {
auto values = GenerateRandomInt64(start, end, size);
arrow::Int64Builder builder;
auto append_result = builder.AppendValues(values->begin(), values->end());
if (!append_result.ok()) {
return append_result;
}
auto array_result = builder.Finish();
if (!array_result.ok()) {
return array_result.status();
}
return array_result;
}
arrow::Result<std::shared_ptr<arrow::ipc::RecordBatchFileReader>>
OpenIPCFileMMapped(const std::string path) {
auto open_result =
arrow::io::MemoryMappedFile::Open(path, arrow::io::FileMode::type::READ);
if (!open_result.ok()) {
return open_result.status();
}
auto input = open_result.ValueOrDie();
auto read_options = arrow::ipc::IpcReadOptions::Defaults();
auto reader = arrow::ipc::RecordBatchFileReader::Open(input, read_options);
return reader;
}
arrow::Result<std::shared_ptr<arrow::RecordBatchReader>>
FileToRecordBatchReader(
const std::shared_ptr<arrow::ipc::RecordBatchFileReader> file_reader) {
auto table_result = file_reader->ToTable();
if (!table_result.ok()) {
return table_result.status();
}
auto tbl = table_result.ValueOrDie();
std::shared_ptr<arrow::RecordBatchReader> reader =
std::make_shared<arrow::TableBatchReader>(tbl);
return reader;
}
arrow::Status RunMain(const std::string path, const std::string col_name,
const int sample_size) {
// Step 1: Open the IPC file as a MemoryMapped file
auto file_reader_result = OpenIPCFileMMapped(path);
if (!file_reader_result.ok()) {
return file_reader_result.status();
}
auto file_reader = file_reader_result.ValueOrDie();
// Step 2: Convert to a RecordBatchReader so Acero can consume it
auto reader_result = FileToRecordBatchReader(file_reader);
if (!reader_result.ok()) {
return file_reader_result.status();
}
auto reader = reader_result.ValueOrDie();
// Step 3: Create our random row indices to filter with
auto count_rows_result = file_reader->CountRows();
if (!count_rows_result.ok()) {
return count_rows_result.status();
}
auto num_rows = count_rows_result.ValueOrDie();
auto gen_result = GenerateRandomArray(1, num_rows, sample_size);
if (!gen_result.ok()) {
return gen_result.status();
}
auto values = gen_result.ValueOrDie();
// Step 4: Create an execute an ExecPlan that filters the RecordBatches
arrow::acero::Declaration decl = arrow::acero::Declaration::Sequence({
{"record_batch_reader_source",
arrow::acero::RecordBatchReaderSourceNodeOptions({reader})},
{"filter", arrow::acero::FilterNodeOptions{call(
"is_in", {arrow::compute::field_ref(col_name)},
arrow::compute::SetLookupOptions{values})}},
});
auto result = arrow::acero::DeclarationToExecBatches(decl, 1);
auto tbl_result =
arrow::acero::TableFromExecBatches(reader->schema(), result->batches);
if (!tbl_result.ok()) {
return tbl_result.status();
}
auto tbl = tbl_result.ValueOrDie();
std::cout << "Successfully filtered to " << tbl->num_rows() << " rows."
<< std::endl;
return arrow::Status::OK();
}
int main(int argc, char **argv) {
if (argc < 3) {
std::cout << "Usage: ./example path_to_ipc_file.arrow 1234" << std::endl;
return -1;
}
auto path = argv[1];
auto sample_size = std::stoi(argv[2]);
std::string col_name = "i"; // Hardcoded for sake of the example
std::cout << "Filtering " << path << " to ~" << sample_size
<< " random rows..." << std::endl;
auto result = RunMain(path, col_name, sample_size);
if (!result.ok()) {
std::cout << "RunMain() failed with error message: " << result.message()
<< std::endl;
return -1;
}
return 0;
}
@amoeba
Copy link
Author

amoeba commented Oct 26, 2023

I edited the gist to use ValueOrDie like our docs suggest but I'm actually going to refactor all of the status/result stuff to use ARROW_ASSIGN_OR_RAISE.

@EpsilonPrime
Copy link

ARROW_ASSIGN_OR_RAISE seems appropriate. I wish it said RETURN instead of RAISE since it doesn't actually raise an exception though. ;)

@amoeba
Copy link
Author

amoeba commented Oct 26, 2023

Yeah, the name is confusing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment