-
-
Save amoeba/2b9703ee49e6465d8b424e67b78770ee to your computer and use it in GitHub Desktop.
#include <iostream> | |
#include <memory> | |
#include <random> | |
#include <string> | |
#include <vector> | |
#include "arrow/acero/exec_plan.h" | |
#include "arrow/acero/options.h" | |
#include "arrow/acero/util.h" | |
#include "arrow/array/data.h" | |
#include "arrow/builder.h" | |
#include "arrow/compute/api_scalar.h" | |
#include "arrow/compute/expression.h" | |
#include "arrow/datum.h" | |
#include "arrow/io/file.h" | |
#include "arrow/ipc/api.h" | |
#include "arrow/ipc/reader.h" | |
#include "arrow/record_batch.h" | |
#include "arrow/result.h" | |
#include "arrow/status.h" | |
#include "arrow/table.h" | |
std::random_device os_seed; | |
const uint_least64_t seed = os_seed(); | |
std::shared_ptr<const std::vector<int64_t>> | |
GenerateRandomInt64(const int64_t start, const int64_t end, | |
const int64_t size) { | |
std::mt19937_64 generator(seed); | |
std::uniform_int_distribution<int_least64_t> distribute(start, end); | |
auto values = std::make_shared<std::vector<int64_t>>(); | |
values->reserve(size); | |
for (int64_t i = 0; i < size; i++) { | |
values->push_back(distribute(generator)); | |
} | |
return values; | |
} | |
arrow::Result<std::shared_ptr<arrow::Array>> | |
GenerateRandomArray(const int64_t start, const int64_t end, | |
const int64_t size) { | |
auto values = GenerateRandomInt64(start, end, size); | |
arrow::Int64Builder builder; | |
auto append_result = builder.AppendValues(values->begin(), values->end()); | |
if (!append_result.ok()) { | |
return append_result; | |
} | |
auto array_result = builder.Finish(); | |
if (!array_result.ok()) { | |
return array_result.status(); | |
} | |
return array_result; | |
} | |
arrow::Result<std::shared_ptr<arrow::ipc::RecordBatchFileReader>> | |
OpenIPCFileMMapped(const std::string path) { | |
auto open_result = | |
arrow::io::MemoryMappedFile::Open(path, arrow::io::FileMode::type::READ); | |
if (!open_result.ok()) { | |
return open_result.status(); | |
} | |
auto input = open_result.ValueOrDie(); | |
auto read_options = arrow::ipc::IpcReadOptions::Defaults(); | |
auto reader = arrow::ipc::RecordBatchFileReader::Open(input, read_options); | |
return reader; | |
} | |
arrow::Result<std::shared_ptr<arrow::RecordBatchReader>> | |
FileToRecordBatchReader( | |
const std::shared_ptr<arrow::ipc::RecordBatchFileReader> file_reader) { | |
auto table_result = file_reader->ToTable(); | |
if (!table_result.ok()) { | |
return table_result.status(); | |
} | |
auto tbl = table_result.ValueOrDie(); | |
std::shared_ptr<arrow::RecordBatchReader> reader = | |
std::make_shared<arrow::TableBatchReader>(tbl); | |
return reader; | |
} | |
arrow::Status RunMain(const std::string path, const std::string col_name, | |
const int sample_size) { | |
// Step 1: Open the IPC file as a MemoryMapped file | |
auto file_reader_result = OpenIPCFileMMapped(path); | |
if (!file_reader_result.ok()) { | |
return file_reader_result.status(); | |
} | |
auto file_reader = file_reader_result.ValueOrDie(); | |
// Step 2: Convert to a RecordBatchReader so Acero can consume it | |
auto reader_result = FileToRecordBatchReader(file_reader); | |
if (!reader_result.ok()) { | |
return file_reader_result.status(); | |
} | |
auto reader = reader_result.ValueOrDie(); | |
// Step 3: Create our random row indices to filter with | |
auto count_rows_result = file_reader->CountRows(); | |
if (!count_rows_result.ok()) { | |
return count_rows_result.status(); | |
} | |
auto num_rows = count_rows_result.ValueOrDie(); | |
auto gen_result = GenerateRandomArray(1, num_rows, sample_size); | |
if (!gen_result.ok()) { | |
return gen_result.status(); | |
} | |
auto values = gen_result.ValueOrDie(); | |
// Step 4: Create an execute an ExecPlan that filters the RecordBatches | |
arrow::acero::Declaration decl = arrow::acero::Declaration::Sequence({ | |
{"record_batch_reader_source", | |
arrow::acero::RecordBatchReaderSourceNodeOptions({reader})}, | |
{"filter", arrow::acero::FilterNodeOptions{call( | |
"is_in", {arrow::compute::field_ref(col_name)}, | |
arrow::compute::SetLookupOptions{values})}}, | |
}); | |
auto result = arrow::acero::DeclarationToExecBatches(decl, 1); | |
auto tbl_result = | |
arrow::acero::TableFromExecBatches(reader->schema(), result->batches); | |
if (!tbl_result.ok()) { | |
return tbl_result.status(); | |
} | |
auto tbl = tbl_result.ValueOrDie(); | |
std::cout << "Successfully filtered to " << tbl->num_rows() << " rows." | |
<< std::endl; | |
return arrow::Status::OK(); | |
} | |
int main(int argc, char **argv) { | |
if (argc < 3) { | |
std::cout << "Usage: ./example path_to_ipc_file.arrow 1234" << std::endl; | |
return -1; | |
} | |
auto path = argv[1]; | |
auto sample_size = std::stoi(argv[2]); | |
std::string col_name = "i"; // Hardcoded for sake of the example | |
std::cout << "Filtering " << path << " to ~" << sample_size | |
<< " random rows..." << std::endl; | |
auto result = RunMain(path, col_name, sample_size); | |
if (!result.ok()) { | |
std::cout << "RunMain() failed with error message: " << result.message() | |
<< std::endl; | |
return -1; | |
} | |
return 0; | |
} |
I prefer to check values in sample code (thinking of ValuesUnsafe) and show proper error handling. A lot of sample code skips that for readability. Depends on your purpose.
I like how that sounds, what does "show proper error handling" look like to you? I'm trying to follow https://arrow.apache.org/docs/cpp/api/support.html#_CPPv4I0EN5arrow6ResultE but instead of printing the status, I'm trying to propagate the status out of RunMain.
I edited the gist to use ValueOrDie like our docs suggest but I'm actually going to refactor all of the status/result stuff to use ARROW_ASSIGN_OR_RAISE
.
ARROW_ASSIGN_OR_RAISE seems appropriate. I wish it said RETURN instead of RAISE since it doesn't actually raise an exception though. ;)
Yeah, the name is confusing.
Thanks for catching that @EpsilonPrime, that seems silly to me now.