-
-
Save amoeba/2b9703ee49e6465d8b424e67b78770ee to your computer and use it in GitHub Desktop.
#include <iostream> | |
#include <memory> | |
#include <random> | |
#include <string> | |
#include <vector> | |
#include "arrow/acero/exec_plan.h" | |
#include "arrow/acero/options.h" | |
#include "arrow/acero/util.h" | |
#include "arrow/array/data.h" | |
#include "arrow/builder.h" | |
#include "arrow/compute/api_scalar.h" | |
#include "arrow/compute/expression.h" | |
#include "arrow/datum.h" | |
#include "arrow/io/file.h" | |
#include "arrow/ipc/api.h" | |
#include "arrow/ipc/reader.h" | |
#include "arrow/record_batch.h" | |
#include "arrow/result.h" | |
#include "arrow/status.h" | |
#include "arrow/table.h" | |
std::random_device os_seed; | |
const uint_least64_t seed = os_seed(); | |
std::shared_ptr<const std::vector<int64_t>> | |
GenerateRandomInt64(const int64_t start, const int64_t end, | |
const int64_t size) { | |
std::mt19937_64 generator(seed); | |
std::uniform_int_distribution<int_least64_t> distribute(start, end); | |
auto values = std::make_shared<std::vector<int64_t>>(); | |
values->reserve(size); | |
for (int64_t i = 0; i < size; i++) { | |
values->push_back(distribute(generator)); | |
} | |
return values; | |
} | |
arrow::Result<std::shared_ptr<arrow::Array>> | |
GenerateRandomArray(const int64_t start, const int64_t end, | |
const int64_t size) { | |
auto values = GenerateRandomInt64(start, end, size); | |
arrow::Int64Builder builder; | |
auto append_result = builder.AppendValues(values->begin(), values->end()); | |
if (!append_result.ok()) { | |
return append_result; | |
} | |
auto array_result = builder.Finish(); | |
if (!array_result.ok()) { | |
return array_result.status(); | |
} | |
return array_result; | |
} | |
arrow::Result<std::shared_ptr<arrow::ipc::RecordBatchFileReader>> | |
OpenIPCFileMMapped(const std::string path) { | |
auto open_result = | |
arrow::io::MemoryMappedFile::Open(path, arrow::io::FileMode::type::READ); | |
if (!open_result.ok()) { | |
return open_result.status(); | |
} | |
auto input = open_result.ValueOrDie(); | |
auto read_options = arrow::ipc::IpcReadOptions::Defaults(); | |
auto reader = arrow::ipc::RecordBatchFileReader::Open(input, read_options); | |
return reader; | |
} | |
arrow::Result<std::shared_ptr<arrow::RecordBatchReader>> | |
FileToRecordBatchReader( | |
const std::shared_ptr<arrow::ipc::RecordBatchFileReader> file_reader) { | |
auto table_result = file_reader->ToTable(); | |
if (!table_result.ok()) { | |
return table_result.status(); | |
} | |
auto tbl = table_result.ValueOrDie(); | |
std::shared_ptr<arrow::RecordBatchReader> reader = | |
std::make_shared<arrow::TableBatchReader>(tbl); | |
return reader; | |
} | |
arrow::Status RunMain(const std::string path, const std::string col_name, | |
const int sample_size) { | |
// Step 1: Open the IPC file as a MemoryMapped file | |
auto file_reader_result = OpenIPCFileMMapped(path); | |
if (!file_reader_result.ok()) { | |
return file_reader_result.status(); | |
} | |
auto file_reader = file_reader_result.ValueOrDie(); | |
// Step 2: Convert to a RecordBatchReader so Acero can consume it | |
auto reader_result = FileToRecordBatchReader(file_reader); | |
if (!reader_result.ok()) { | |
return file_reader_result.status(); | |
} | |
auto reader = reader_result.ValueOrDie(); | |
// Step 3: Create our random row indices to filter with | |
auto count_rows_result = file_reader->CountRows(); | |
if (!count_rows_result.ok()) { | |
return count_rows_result.status(); | |
} | |
auto num_rows = count_rows_result.ValueOrDie(); | |
auto gen_result = GenerateRandomArray(1, num_rows, sample_size); | |
if (!gen_result.ok()) { | |
return gen_result.status(); | |
} | |
auto values = gen_result.ValueOrDie(); | |
// Step 4: Create an execute an ExecPlan that filters the RecordBatches | |
arrow::acero::Declaration decl = arrow::acero::Declaration::Sequence({ | |
{"record_batch_reader_source", | |
arrow::acero::RecordBatchReaderSourceNodeOptions({reader})}, | |
{"filter", arrow::acero::FilterNodeOptions{call( | |
"is_in", {arrow::compute::field_ref(col_name)}, | |
arrow::compute::SetLookupOptions{values})}}, | |
}); | |
auto result = arrow::acero::DeclarationToExecBatches(decl, 1); | |
auto tbl_result = | |
arrow::acero::TableFromExecBatches(reader->schema(), result->batches); | |
if (!tbl_result.ok()) { | |
return tbl_result.status(); | |
} | |
auto tbl = tbl_result.ValueOrDie(); | |
std::cout << "Successfully filtered to " << tbl->num_rows() << " rows." | |
<< std::endl; | |
return arrow::Status::OK(); | |
} | |
int main(int argc, char **argv) { | |
if (argc < 3) { | |
std::cout << "Usage: ./example path_to_ipc_file.arrow 1234" << std::endl; | |
return -1; | |
} | |
auto path = argv[1]; | |
auto sample_size = std::stoi(argv[2]); | |
std::string col_name = "i"; // Hardcoded for sake of the example | |
std::cout << "Filtering " << path << " to ~" << sample_size | |
<< " random rows..." << std::endl; | |
auto result = RunMain(path, col_name, sample_size); | |
if (!result.ok()) { | |
std::cout << "RunMain() failed with error message: " << result.message() | |
<< std::endl; | |
return -1; | |
} | |
return 0; | |
} |
Calling ValueOrDie and then returning an OK error status seems odd. Either remove the error return or handle the error.
Calling ValueOrDie and then returning an OK error status seems odd. Either remove the error return or handle the error.
Thanks for catching that @EpsilonPrime, that seems silly to me now.
I prefer to check values in sample code (thinking of ValuesUnsafe) and show proper error handling. A lot of sample code skips that for readability. Depends on your purpose.
I like how that sounds, what does "show proper error handling" look like to you? I'm trying to follow https://arrow.apache.org/docs/cpp/api/support.html#_CPPv4I0EN5arrow6ResultE but instead of printing the status, I'm trying to propagate the status out of RunMain.
I edited the gist to use ValueOrDie like our docs suggest but I'm actually going to refactor all of the status/result stuff to use ARROW_ASSIGN_OR_RAISE
.
ARROW_ASSIGN_OR_RAISE seems appropriate. I wish it said RETURN instead of RAISE since it doesn't actually raise an exception though. ;)
Yeah, the name is confusing.
I prefer to check values in sample code (thinking of ValuesUnsafe) and show proper error handling. A lot of sample code skips that for readability. Depends on your purpose.