Skip to content

Instantly share code, notes, and snippets.

@westonpace
Created March 16, 2023 20:36
Show Gist options
  • Save westonpace/be500030cc268a626af60abb9299b9ae to your computer and use it in GitHub Desktop.
Save westonpace/be500030cc268a626af60abb9299b9ae to your computer and use it in GitHub Desktop.
Example of applying a group-by operation with arrow-c++
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include <arrow/io/api.h>
#include <arrow/array/builder_primitive.h>
#include <arrow/array/builder_binary.h>
#include <arrow/compute/api.h>
#include <arrow/compute/exec/exec_plan.h>
#include <arrow/compute/exec/options.h>
#include <arrow/result.h>
#include <arrow/status.h>
#include <arrow/table.h>
#include <parquet/arrow/writer.h>
#include <iostream>
using arrow::Result;
using arrow::Status;
namespace
{
static constexpr int32_t kNumVals = 10000;
Result<std::shared_ptr<arrow::Array>> ValuesArray()
{
arrow::Int32Builder builder;
ARROW_RETURN_NOT_OK(builder.Reserve(kNumVals));
for (int32_t i = 0; i < kNumVals; i++)
{
ARROW_RETURN_NOT_OK(builder.Append(i));
}
return builder.Finish();
}
Result<std::shared_ptr<arrow::Array>> KeysArray()
{
arrow::StringBuilder builder;
ARROW_RETURN_NOT_OK(builder.Reserve(kNumVals));
for (int32_t i = 0; i < kNumVals; i++)
{
if (i % 2 == 0)
{
ARROW_RETURN_NOT_OK(builder.Append("A"));
}
else
{
ARROW_RETURN_NOT_OK(builder.Append("B"));
}
}
return builder.Finish();
}
Result<std::shared_ptr<arrow::Table>> MakeSampleTable()
{
std::shared_ptr<arrow::Schema> schema = arrow::schema({arrow::field("keys", arrow::utf8()), arrow::field("values", arrow::int32())});
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> keys, KeysArray());
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Array> values, ValuesArray());
return arrow::Table::Make(std::move(schema), {std::move(keys), std::move(values)}, kNumVals);
}
Status UsingAPlan()
{
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Table> sample_table, MakeSampleTable());
arrow::compute::Aggregate aggregate;
aggregate.function = "hash_sum"; // The function to apply
aggregate.name = "SUM OF VALUES"; // The default name of the output column
aggregate.options = nullptr; // Custom options (e.g. how to handle null)
aggregate.target = std::vector<arrow::FieldRef>({"values"}); // Which field to aggregate. Some aggregate functions (e.g. covariance)
// may require targetting multiple fields
arrow::compute::Declaration plan = arrow::compute::Declaration::Sequence({{"table_source", arrow::compute::TableSourceNodeOptions(std::move(sample_table))},
{"aggregate", arrow::compute::AggregateNodeOptions(/*aggregates=*/{aggregate}, /*keys=*/{"keys"})}});
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Table> grouped, arrow::compute::DeclarationToTable(std::move(plan)));
std::cout << grouped->ToString() << std::endl;
return Status::OK();
}
Status UsingConvenience()
{
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Table> sample_table, MakeSampleTable());
arrow::compute::Aggregate aggregate;
aggregate.function = "hash_sum"; // The function to apply
aggregate.name = "SUM OF VALUES"; // The default name of the output column
aggregate.options = nullptr; // Custom options (e.g. how to handle null)
aggregate.target = std::vector<arrow::FieldRef>({"values"}); // Which field to aggregate. Some aggregate functions (e.g. covariance)
// may require targetting multiple fields
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Table> grouped,
arrow::compute::TableGroupBy(std::move(sample_table), {std::move(aggregate)}, {"keys"}));
std::cout << grouped->ToString() << std::endl;
return Status::OK();
}
} // namespace
int main()
{
Status st = UsingAPlan();
if (st.ok())
{
st = UsingConvenience();
}
if (!st.ok())
{
std::cerr << st << std::endl;
return 1;
}
return 0;
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment