Skip to content

Instantly share code, notes, and snippets.

@chathurawidanage
Created February 20, 2020 16:40
Show Gist options
  • Save chathurawidanage/248d2b2fe86d124e0ef7b4bbd431ba9e to your computer and use it in GitHub Desktop.
Save chathurawidanage/248d2b2fe86d124e0ef7b4bbd431ba9e to your computer and use it in GitHub Desktop.
#include <arrow/compute/api.h>
#include <arrow/array/builder_base.h>
#include "arrow_utils.h"
namespace twisterx::util {
void copy_int64_by_index(int64_t index,
std::shared_ptr<arrow::Array> data_column,
std::shared_ptr<arrow::ArrayBuilder> array_builder) {
auto casted_builder = std::static_pointer_cast<arrow::Int64Builder>(array_builder);
auto casted_data_array = std::static_pointer_cast<arrow::Int64Array>(data_column);
//casted_data_array->Value(index)
casted_builder->Append(NULLPTR);
}
void sort_column(const std::shared_ptr<arrow::Array> &data_column,
const std::shared_ptr<arrow::Int64Array> &sorted_indices,
const std::shared_ptr<arrow::ArrayBuilder> &array_builder) {
int64_t length = sorted_indices->length();
arrow::Status reserveStatus = array_builder->Reserve(length);
for (int64_t index = 0; index < length; ++index) {
int64_t current_index = sorted_indices->Value(index);
copy_int64_by_index(current_index, data_column, array_builder);
}
}
template<typename JOIN_COLUMN_ARRAY, typename ARROW_KEY_TYPE, typename CPP_KEY_TYPE>
void sort_table(std::shared_ptr<arrow::Table> tab, int64_t sort_column_index,
std::shared_ptr<std::unordered_map<int64_t, std::shared_ptr<arrow::ArrayBuilder>>> column_builders,
arrow::MemoryPool *memory_pool) {
std::shared_ptr<arrow::Table> tab_to_process;
// combine chunks if multiple chunks are available
if (tab->column(sort_column_index)->num_chunks() > 1) {
arrow::Status left_combine_stat = tab->CombineChunks(memory_pool, &tab);
} else {
tab_to_process = tab;
}
auto column_to_sort = std::static_pointer_cast<JOIN_COLUMN_ARRAY>(
tab_to_process->column(sort_column_index)->chunk(0));
// sort to indices
std::shared_ptr<arrow::Array> sorted_column_index;
arrow::compute::FunctionContext ctx;
arrow::Status status = arrow::compute::SortToIndices(&ctx, *column_to_sort, &sorted_column_index);
auto index_lookup = std::static_pointer_cast<arrow::Int64Array>(sorted_column_index);
// now sort everything based on sorted index
int64_t no_of_columns = tab_to_process->num_columns();
for (int64_t col_index = 0; col_index < no_of_columns; ++col_index) {
sort_column(tab_to_process->column(col_index)->chunk(0), index_lookup,
std::shared_ptr<arrow::ArrayBuilder>(column_builders->find(col_index)->second));
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment