Skip to content

Instantly share code, notes, and snippets.

@wesm
Created October 9, 2018 14:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save wesm/e8e43aba036db747fb9c021d590be938 to your computer and use it in GitHub Desktop.
Save wesm/e8e43aba036db747fb9c021d590be938 to your computer and use it in GitHub Desktop.
diff --git a/cpp/src/parquet/.parquetcppversion b/cpp/src/parquet/.parquetcppversion
index d65937f10..f825f7c7f 100644
--- a/cpp/src/parquet/.parquetcppversion
+++ b/cpp/src/parquet/.parquetcppversion
@@ -1 +1 @@
-1.4.1-SNAPSHOT
+1.5.1-SNAPSHOT
diff --git a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
index 5f4e12349..086672711 100644
--- a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -1391,6 +1391,16 @@ TEST(TestArrowReadWrite, CoerceTimestampsLosePrecision) {
ASSERT_RAISES(Invalid, WriteTable(*t4, ::arrow::default_memory_pool(), sink, 10,
default_writer_properties(), coerce_millis));
+ // OK to lose precision if we explicitly allow it
+ auto allow_truncation = (ArrowWriterProperties::Builder()
+ .coerce_timestamps(TimeUnit::MILLI)
+ ->allow_truncated_timestamps()
+ ->build());
+ ASSERT_OK_NO_THROW(WriteTable(*t3, ::arrow::default_memory_pool(), sink, 10,
+ default_writer_properties(), allow_truncation));
+ ASSERT_OK_NO_THROW(WriteTable(*t4, ::arrow::default_memory_pool(), sink, 10,
+ default_writer_properties(), allow_truncation));
+
// OK to write micros to micros
auto coerce_micros =
(ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::MICRO)->build());
@@ -2316,11 +2326,11 @@ TEST(TestArrowReaderAdHoc, Int96BadMemoryAccess) {
ASSERT_OK_NO_THROW(arrow_reader->ReadTable(&table));
}
-class TestArrowReaderAdHocSpark
+class TestArrowReaderAdHocSparkAndHvr
: public ::testing::TestWithParam<
std::tuple<std::string, std::shared_ptr<::DataType>>> {};
-TEST_P(TestArrowReaderAdHocSpark, ReadDecimals) {
+TEST_P(TestArrowReaderAdHocSparkAndHvr, ReadDecimals) {
std::string path(test::get_data_dir());
std::string filename;
@@ -2364,12 +2374,13 @@ TEST_P(TestArrowReaderAdHocSpark, ReadDecimals) {
}
INSTANTIATE_TEST_CASE_P(
- ReadDecimals, TestArrowReaderAdHocSpark,
+ ReadDecimals, TestArrowReaderAdHocSparkAndHvr,
::testing::Values(
std::make_tuple("int32_decimal.parquet", ::arrow::decimal(4, 2)),
std::make_tuple("int64_decimal.parquet", ::arrow::decimal(10, 2)),
std::make_tuple("fixed_length_decimal.parquet", ::arrow::decimal(25, 2)),
- std::make_tuple("fixed_length_decimal_legacy.parquet", ::arrow::decimal(13, 2))));
+ std::make_tuple("fixed_length_decimal_legacy.parquet", ::arrow::decimal(13, 2)),
+ std::make_tuple("byte_array_decimal.parquet", ::arrow::decimal(4, 2))));
} // namespace arrow
diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc
index 11fb20cd1..2006025ac 100644
--- a/cpp/src/parquet/arrow/reader.cc
+++ b/cpp/src/parquet/arrow/reader.cc
@@ -18,26 +18,30 @@
#include "parquet/arrow/reader.h"
#include <algorithm>
-#include <atomic>
-#include <chrono>
-#include <mutex>
-#include <queue>
+#include <climits>
+#include <cstring>
+#include <future>
+#include <ostream>
#include <string>
-#include <thread>
#include <type_traits>
#include <utility>
#include <vector>
#include "arrow/api.h"
#include "arrow/util/bit-util.h"
-#include "arrow/util/decimal.h"
#include "arrow/util/logging.h"
#include "arrow/util/thread-pool.h"
#include "parquet/arrow/record_reader.h"
#include "parquet/arrow/schema.h"
#include "parquet/column_reader.h"
+#include "parquet/exception.h"
+#include "parquet/file_reader.h"
+#include "parquet/metadata.h"
+#include "parquet/properties.h"
#include "parquet/schema.h"
+#include "parquet/types.h"
+#include "parquet/util/memory.h"
#include "parquet/util/schema-util.h"
using arrow::Array;
@@ -1221,6 +1225,64 @@ struct TransferFunctor<::arrow::Decimal128Type, FLBAType> {
}
};
+/// \brief Convert an arrow::BinaryArray to an arrow::Decimal128Array
+/// We do this by:
+/// 1. Creating an arrow::BinaryArray from the RecordReader's builder
+/// 2. Allocating a buffer for the arrow::Decimal128Array
+/// 3. Converting the big-endian bytes in each BinaryArray entry to two integers
+/// representing the high and low bits of each decimal value.
+template <>
+struct TransferFunctor<::arrow::Decimal128Type, ByteArrayType> {
+ Status operator()(RecordReader* reader, MemoryPool* pool,
+ const std::shared_ptr<::arrow::DataType>& type,
+ std::shared_ptr<Array>* out) {
+ DCHECK_EQ(type->id(), ::arrow::Type::DECIMAL);
+
+ // Finish the built data into a temporary array
+ std::shared_ptr<Array> array;
+ RETURN_NOT_OK(reader->builder()->Finish(&array));
+ const auto& binary_array = static_cast<const ::arrow::BinaryArray&>(*array);
+
+ const int64_t length = binary_array.length();
+
+ const auto& decimal_type = static_cast<const ::arrow::Decimal128Type&>(*type);
+ const int64_t type_length = decimal_type.byte_width();
+
+ std::shared_ptr<Buffer> data;
+ RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * type_length, &data));
+
+ // raw bytes that we can write to
+ uint8_t* out_ptr = data->mutable_data();
+
+ const int64_t null_count = binary_array.null_count();
+
+ // convert each BinaryArray value to valid decimal bytes
+ for (int64_t i = 0; i < length; i++, out_ptr += type_length) {
+ int32_t record_len = 0;
+ const uint8_t* record_loc = binary_array.GetValue(i, &record_len);
+
+ if ((record_len < 0) || (record_len > type_length)) {
+ return Status::Invalid("Invalid BYTE_ARRAY size");
+ }
+
+ auto out_ptr_view = reinterpret_cast<uint64_t*>(out_ptr);
+ out_ptr_view[0] = 0;
+ out_ptr_view[1] = 0;
+
+ // only convert rows that are not null if there are nulls, or
+ // all rows, if there are not
+ if (((null_count > 0) && !binary_array.IsNull(i)) || (null_count <= 0)) {
+ RawBytesToDecimalBytes(record_loc, record_len, out_ptr);
+ }
+ }
+
+ *out = std::make_shared<::arrow::Decimal128Array>(
+ type, length, data, binary_array.null_bitmap(), null_count);
+
+ return Status::OK();
+ }
+};
+
/// \brief Convert an Int32 or Int64 array into a Decimal128Array
/// The parquet spec allows systems to write decimals in int32, int64 if the values are
/// small enough to fit in less 4 bytes or less than 8 bytes, respectively.
@@ -1353,12 +1415,16 @@ Status PrimitiveImpl::NextBatch(int64_t records_to_read, std::shared_ptr<Array>*
case ::parquet::Type::INT64: {
TRANSFER_DATA(::arrow::Decimal128Type, Int64Type);
} break;
+ case ::parquet::Type::BYTE_ARRAY: {
+ TRANSFER_DATA(::arrow::Decimal128Type, ByteArrayType);
+ } break;
case ::parquet::Type::FIXED_LEN_BYTE_ARRAY: {
TRANSFER_DATA(::arrow::Decimal128Type, FLBAType);
} break;
default:
return Status::Invalid(
- "Physical type for decimal must be int32, int64, or fixed length binary");
+ "Physical type for decimal must be int32, int64, byte array, or fixed "
+ "length binary");
}
} break;
case ::arrow::Type::TIMESTAMP: {
diff --git a/cpp/src/parquet/arrow/reader.h b/cpp/src/parquet/arrow/reader.h
index 6eee0f6e2..2cd94ca28 100644
--- a/cpp/src/parquet/arrow/reader.h
+++ b/cpp/src/parquet/arrow/reader.h
@@ -18,25 +18,32 @@
#ifndef PARQUET_ARROW_READER_H
#define PARQUET_ARROW_READER_H
+#include <cstdint>
#include <memory>
#include <vector>
-#include "parquet/api/reader.h"
-#include "parquet/api/schema.h"
+#include "parquet/util/visibility.h"
#include "arrow/io/interfaces.h"
+#include "arrow/util/macros.h"
namespace arrow {
class Array;
class MemoryPool;
class RecordBatchReader;
+class Schema;
class Status;
class Table;
+
} // namespace arrow
namespace parquet {
+class FileMetaData;
+class ParquetFileReader;
+class ReaderProperties;
+
namespace arrow {
class ColumnChunkReader;
diff --git a/cpp/src/parquet/arrow/record_reader.cc b/cpp/src/parquet/arrow/record_reader.cc
index 3fbdfd586..ce6fa2a5b 100644
--- a/cpp/src/parquet/arrow/record_reader.cc
+++ b/cpp/src/parquet/arrow/record_reader.cc
@@ -19,21 +19,29 @@
#include <algorithm>
#include <cstdint>
+#include <cstring>
#include <memory>
#include <sstream>
+#include <unordered_map>
#include <utility>
-#include <arrow/buffer.h>
-#include <arrow/memory_pool.h>
-#include <arrow/status.h>
-#include <arrow/util/bit-util.h>
-#include <arrow/util/rle-encoding.h>
+#include "arrow/buffer.h"
+#include "arrow/builder.h"
+#include "arrow/memory_pool.h"
+#include "arrow/status.h"
+#include "arrow/type.h"
+#include "arrow/util/bit-util.h"
+#include "arrow/util/logging.h"
+#include "arrow/util/rle-encoding.h"
#include "parquet/column_page.h"
#include "parquet/column_reader.h"
#include "parquet/encoding-internal.h"
+#include "parquet/encoding.h"
#include "parquet/exception.h"
#include "parquet/properties.h"
+#include "parquet/schema.h"
+#include "parquet/types.h"
using arrow::MemoryPool;
diff --git a/cpp/src/parquet/arrow/record_reader.h b/cpp/src/parquet/arrow/record_reader.h
index 4935713a2..8da070999 100644
--- a/cpp/src/parquet/arrow/record_reader.h
+++ b/cpp/src/parquet/arrow/record_reader.h
@@ -19,22 +19,24 @@
#define PARQUET_RECORD_READER_H
#include <cstdint>
-#include <cstring>
-#include <iostream>
#include <memory>
-#include <unordered_map>
-#include <vector>
-#include <arrow/buffer.h>
-#include <arrow/builder.h>
-#include <arrow/memory_pool.h>
-#include <arrow/util/bit-util.h>
+#include "arrow/memory_pool.h"
-#include "parquet/column_reader.h"
-#include "parquet/schema.h"
#include "parquet/util/macros.h"
+#include "parquet/util/memory.h"
+
+namespace arrow {
+
+class ArrayBuilder;
+
+} // namespace arrow
namespace parquet {
+
+class ColumnDescriptor;
+class PageReader;
+
namespace internal {
/// \brief Stateful column reader that delimits semantic records for both flat
diff --git a/cpp/src/parquet/arrow/schema.h b/cpp/src/parquet/arrow/schema.h
index 3b212da7e..8e920850c 100644
--- a/cpp/src/parquet/arrow/schema.h
+++ b/cpp/src/parquet/arrow/schema.h
@@ -18,14 +18,16 @@
#ifndef PARQUET_ARROW_SCHEMA_H
#define PARQUET_ARROW_SCHEMA_H
+#include <cstdint>
#include <memory>
#include <vector>
#include "arrow/api.h"
-#include "parquet/api/schema.h"
-#include "parquet/api/writer.h"
#include "parquet/arrow/writer.h"
+#include "parquet/metadata.h"
+#include "parquet/schema.h"
+#include "parquet/util/visibility.h"
namespace arrow {
@@ -35,8 +37,12 @@ class Status;
namespace parquet {
+class WriterProperties;
+
namespace arrow {
+class ArrowWriterProperties;
+
PARQUET_EXPORT
::arrow::Status NodeToField(const schema::Node& node,
std::shared_ptr<::arrow::Field>* out);
diff --git a/cpp/src/parquet/arrow/writer.cc b/cpp/src/parquet/arrow/writer.cc
index 9247b84cf..923f13294 100644
--- a/cpp/src/parquet/arrow/writer.cc
+++ b/cpp/src/parquet/arrow/writer.cc
@@ -366,8 +366,9 @@ class ArrowColumnWriter {
Status WriteTimestamps(const Array& data, int64_t num_levels, const int16_t* def_levels,
const int16_t* rep_levels);
- Status WriteTimestampsCoerce(const Array& data, int64_t num_levels,
- const int16_t* def_levels, const int16_t* rep_levels);
+ Status WriteTimestampsCoerce(const bool truncated_timestamps_allowed, const Array& data,
+ int64_t num_levels, const int16_t* def_levels,
+ const int16_t* rep_levels);
template <typename ParquetType, typename ArrowType>
Status WriteNonNullableBatch(const ArrowType& type, int64_t num_values,
@@ -626,7 +627,8 @@ Status ArrowColumnWriter::WriteTimestamps(const Array& values, int64_t num_level
// Casting is required. This covers several cases
// * Nanoseconds -> cast to microseconds
// * coerce_timestamps_enabled_, cast all timestamps to requested unit
- return WriteTimestampsCoerce(values, num_levels, def_levels, rep_levels);
+ return WriteTimestampsCoerce(ctx_->properties->truncated_timestamps_allowed(), values,
+ num_levels, def_levels, rep_levels);
} else {
// No casting of timestamps is required, take the fast path
return TypedWriteBatch<Int64Type, ::arrow::TimestampType>(values, num_levels,
@@ -634,7 +636,8 @@ Status ArrowColumnWriter::WriteTimestamps(const Array& values, int64_t num_level
}
}
-Status ArrowColumnWriter::WriteTimestampsCoerce(const Array& array, int64_t num_levels,
+Status ArrowColumnWriter::WriteTimestampsCoerce(const bool truncated_timestamps_allowed,
+ const Array& array, int64_t num_levels,
const int16_t* def_levels,
const int16_t* rep_levels) {
int64_t* buffer;
@@ -652,7 +655,7 @@ Status ArrowColumnWriter::WriteTimestampsCoerce(const Array& array, int64_t num_
auto DivideBy = [&](const int64_t factor) {
for (int64_t i = 0; i < array.length(); i++) {
- if (!data.IsNull(i) && (values[i] % factor != 0)) {
+ if (!truncated_timestamps_allowed && !data.IsNull(i) && (values[i] % factor != 0)) {
std::stringstream ss;
ss << "Casting from " << type.ToString() << " to " << target_type->ToString()
<< " would lose data: " << values[i];
diff --git a/cpp/src/parquet/arrow/writer.h b/cpp/src/parquet/arrow/writer.h
index ad6f1d52d..7e4b2287b 100644
--- a/cpp/src/parquet/arrow/writer.h
+++ b/cpp/src/parquet/arrow/writer.h
@@ -44,7 +44,10 @@ class PARQUET_EXPORT ArrowWriterProperties {
public:
class Builder {
public:
- Builder() : write_nanos_as_int96_(false), coerce_timestamps_enabled_(false) {}
+ Builder()
+ : write_nanos_as_int96_(false),
+ coerce_timestamps_enabled_(false),
+ truncated_timestamps_allowed_(false) {}
virtual ~Builder() {}
Builder* disable_deprecated_int96_timestamps() {
@@ -63,9 +66,20 @@ class PARQUET_EXPORT ArrowWriterProperties {
return this;
}
+ Builder* allow_truncated_timestamps() {
+ truncated_timestamps_allowed_ = true;
+ return this;
+ }
+
+ Builder* disallow_truncated_timestamps() {
+ truncated_timestamps_allowed_ = false;
+ return this;
+ }
+
std::shared_ptr<ArrowWriterProperties> build() {
return std::shared_ptr<ArrowWriterProperties>(new ArrowWriterProperties(
- write_nanos_as_int96_, coerce_timestamps_enabled_, coerce_timestamps_unit_));
+ write_nanos_as_int96_, coerce_timestamps_enabled_, coerce_timestamps_unit_,
+ truncated_timestamps_allowed_));
}
private:
@@ -73,6 +87,7 @@ class PARQUET_EXPORT ArrowWriterProperties {
bool coerce_timestamps_enabled_;
::arrow::TimeUnit::type coerce_timestamps_unit_;
+ bool truncated_timestamps_allowed_;
};
bool support_deprecated_int96_timestamps() const { return write_nanos_as_int96_; }
@@ -82,17 +97,22 @@ class PARQUET_EXPORT ArrowWriterProperties {
return coerce_timestamps_unit_;
}
+ bool truncated_timestamps_allowed() const { return truncated_timestamps_allowed_; }
+
private:
explicit ArrowWriterProperties(bool write_nanos_as_int96,
bool coerce_timestamps_enabled,
- ::arrow::TimeUnit::type coerce_timestamps_unit)
+ ::arrow::TimeUnit::type coerce_timestamps_unit,
+ bool truncated_timestamps_allowed)
: write_nanos_as_int96_(write_nanos_as_int96),
coerce_timestamps_enabled_(coerce_timestamps_enabled),
- coerce_timestamps_unit_(coerce_timestamps_unit) {}
+ coerce_timestamps_unit_(coerce_timestamps_unit),
+ truncated_timestamps_allowed_(truncated_timestamps_allowed) {}
const bool write_nanos_as_int96_;
const bool coerce_timestamps_enabled_;
const ::arrow::TimeUnit::type coerce_timestamps_unit_;
+ const bool truncated_timestamps_allowed_;
};
std::shared_ptr<ArrowWriterProperties> PARQUET_EXPORT default_arrow_writer_properties();
diff --git a/cpp/src/parquet/bloom_filter-test.cc b/cpp/src/parquet/bloom_filter-test.cc
index 96d2e065f..945f80b7b 100644
--- a/cpp/src/parquet/bloom_filter-test.cc
+++ b/cpp/src/parquet/bloom_filter-test.cc
@@ -17,13 +17,21 @@
#include <gtest/gtest.h>
-#include <algorithm>
+#include <cstdint>
+#include <limits>
+#include <memory>
#include <random>
#include <string>
+#include <vector>
+#include "arrow/buffer.h"
#include "arrow/io/file.h"
+#include "arrow/status.h"
+
#include "parquet/bloom_filter.h"
+#include "parquet/exception.h"
#include "parquet/murmur3.h"
+#include "parquet/types.h"
#include "parquet/util/memory.h"
#include "parquet/util/test-common.h"
diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc
index 173292ecd..7fbf9babd 100644
--- a/cpp/src/parquet/column_reader.cc
+++ b/cpp/src/parquet/column_reader.cc
@@ -52,7 +52,8 @@ int LevelDecoder::SetData(Encoding::type encoding, int16_t max_level,
num_bytes = *reinterpret_cast<const int32_t*>(data);
const uint8_t* decoder_data = data + sizeof(int32_t);
if (!rle_decoder_) {
- rle_decoder_.reset(new ::arrow::RleDecoder(decoder_data, num_bytes, bit_width_));
+ rle_decoder_.reset(
+ new ::arrow::util::RleDecoder(decoder_data, num_bytes, bit_width_));
} else {
rle_decoder_->Reset(decoder_data, num_bytes, bit_width_);
}
@@ -62,7 +63,7 @@ int LevelDecoder::SetData(Encoding::type encoding, int16_t max_level,
num_bytes =
static_cast<int32_t>(BitUtil::BytesForBits(num_buffered_values * bit_width_));
if (!bit_packed_decoder_) {
- bit_packed_decoder_.reset(new ::arrow::BitReader(data, num_bytes));
+ bit_packed_decoder_.reset(new ::arrow::BitUtil::BitReader(data, num_bytes));
} else {
bit_packed_decoder_->Reset(data, num_bytes);
}
@@ -123,7 +124,7 @@ class SerializedPageReader : public PageReader {
std::shared_ptr<Page> current_page_;
// Compression codec to use.
- std::unique_ptr<::arrow::Codec> decompressor_;
+ std::unique_ptr<::arrow::util::Codec> decompressor_;
std::shared_ptr<ResizableBuffer> decompression_buffer_;
// Maximum allowed page size
diff --git a/cpp/src/parquet/column_reader.h b/cpp/src/parquet/column_reader.h
index d1b4d2ef5..960f2107d 100644
--- a/cpp/src/parquet/column_reader.h
+++ b/cpp/src/parquet/column_reader.h
@@ -44,8 +44,13 @@
namespace arrow {
+namespace BitUtil {
class BitReader;
+} // namespace BitUtil
+
+namespace util {
class RleDecoder;
+} // namespace util
} // namespace arrow
@@ -76,8 +81,8 @@ class PARQUET_EXPORT LevelDecoder {
int bit_width_;
int num_values_remaining_;
Encoding::type encoding_;
- std::unique_ptr<::arrow::RleDecoder> rle_decoder_;
- std::unique_ptr<::arrow::BitReader> bit_packed_decoder_;
+ std::unique_ptr<::arrow::util::RleDecoder> rle_decoder_;
+ std::unique_ptr<::arrow::BitUtil::BitReader> bit_packed_decoder_;
};
// Abstract page iterator interface. This way, we can feed column pages to the
diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc
index 9c7a39bfe..a45613f1b 100644
--- a/cpp/src/parquet/column_writer.cc
+++ b/cpp/src/parquet/column_writer.cc
@@ -34,8 +34,8 @@
namespace parquet {
-using BitWriter = ::arrow::BitWriter;
-using RleEncoder = ::arrow::RleEncoder;
+using BitWriter = ::arrow::BitUtil::BitWriter;
+using RleEncoder = ::arrow::util::RleEncoder;
LevelEncoder::LevelEncoder() {}
LevelEncoder::~LevelEncoder() {}
@@ -271,7 +271,7 @@ class SerializedPageWriter : public PageWriter {
int64_t total_compressed_size_;
// Compression codec to use.
- std::unique_ptr<::arrow::Codec> compressor_;
+ std::unique_ptr<::arrow::util::Codec> compressor_;
};
// This implementation of the PageWriter writes to the final sink on Close .
diff --git a/cpp/src/parquet/column_writer.h b/cpp/src/parquet/column_writer.h
index e3bfcf0ae..457c532bb 100644
--- a/cpp/src/parquet/column_writer.h
+++ b/cpp/src/parquet/column_writer.h
@@ -34,8 +34,13 @@
namespace arrow {
+namespace BitUtil {
class BitWriter;
+} // namespace BitUtil
+
+namespace util {
class RleEncoder;
+} // namespace util
} // namespace arrow
@@ -67,8 +72,8 @@ class PARQUET_EXPORT LevelEncoder {
int bit_width_;
int rle_length_;
Encoding::type encoding_;
- std::unique_ptr<::arrow::RleEncoder> rle_encoder_;
- std::unique_ptr<::arrow::BitWriter> bit_packed_encoder_;
+ std::unique_ptr<::arrow::util::RleEncoder> rle_encoder_;
+ std::unique_ptr<::arrow::BitUtil::BitWriter> bit_packed_encoder_;
};
class PageWriter {
diff --git a/cpp/src/parquet/encoding-internal.h b/cpp/src/parquet/encoding-internal.h
index 0bfd26fbd..93d499300 100644
--- a/cpp/src/parquet/encoding-internal.h
+++ b/cpp/src/parquet/encoding-internal.h
@@ -143,7 +143,7 @@ class PlainDecoder<BooleanType> : public Decoder<BooleanType> {
virtual void SetData(int num_values, const uint8_t* data, int len) {
num_values_ = num_values;
- bit_reader_ = ::arrow::BitReader(data, len);
+ bit_reader_ = BitUtil::BitReader(data, len);
}
// Two flavors of bool decoding
@@ -175,7 +175,7 @@ class PlainDecoder<BooleanType> : public Decoder<BooleanType> {
}
private:
- ::arrow::BitReader bit_reader_;
+ BitUtil::BitReader bit_reader_;
};
// ----------------------------------------------------------------------
@@ -210,7 +210,7 @@ class PlainEncoder<BooleanType> : public Encoder<BooleanType> {
bits_available_(kInMemoryDefaultCapacity * 8),
bits_buffer_(AllocateBuffer(pool, kInMemoryDefaultCapacity)),
values_sink_(new InMemoryOutputStream(pool)) {
- bit_writer_.reset(new ::arrow::BitWriter(bits_buffer_->mutable_data(),
+ bit_writer_.reset(new BitUtil::BitWriter(bits_buffer_->mutable_data(),
static_cast<int>(bits_buffer_->size())));
}
@@ -274,7 +274,7 @@ class PlainEncoder<BooleanType> : public Encoder<BooleanType> {
protected:
int bits_available_;
- std::unique_ptr<::arrow::BitWriter> bit_writer_;
+ std::unique_ptr<BitUtil::BitWriter> bit_writer_;
std::shared_ptr<ResizableBuffer> bits_buffer_;
std::unique_ptr<InMemoryOutputStream> values_sink_;
};
@@ -341,7 +341,7 @@ class DictionaryDecoder : public Decoder<Type> {
uint8_t bit_width = *data;
++data;
--len;
- idx_decoder_ = ::arrow::RleDecoder(data, len, bit_width);
+ idx_decoder_ = ::arrow::util::RleDecoder(data, len, bit_width);
}
int Decode(T* buffer, int max_values) override {
@@ -376,7 +376,7 @@ class DictionaryDecoder : public Decoder<Type> {
// pointers).
std::shared_ptr<ResizableBuffer> byte_array_data_;
- ::arrow::RleDecoder idx_decoder_;
+ ::arrow::util::RleDecoder idx_decoder_;
};
template <typename Type>
@@ -468,7 +468,7 @@ class DictEncoder : public Encoder<DType> {
dict_encoded_size_(0),
type_length_(desc->type_length()) {
hash_slots_.Assign(hash_table_size_, HASH_SLOT_EMPTY);
- cpu_info_ = ::arrow::CpuInfo::GetInstance();
+ cpu_info_ = ::arrow::internal::CpuInfo::GetInstance();
}
~DictEncoder() override { DCHECK(buffered_indices_.empty()); }
@@ -487,9 +487,9 @@ class DictEncoder : public Encoder<DType> {
// an extra "RleEncoder::MinBufferSize" bytes. These extra bytes won't be used
// but not reserving them would cause the encoder to fail.
return 1 +
- ::arrow::RleEncoder::MaxBufferSize(
+ ::arrow::util::RleEncoder::MaxBufferSize(
bit_width(), static_cast<int>(buffered_indices_.size())) +
- ::arrow::RleEncoder::MinBufferSize(bit_width());
+ ::arrow::util::RleEncoder::MinBufferSize(bit_width());
}
/// The minimum bit width required to encode the currently buffered indices.
@@ -580,7 +580,7 @@ class DictEncoder : public Encoder<DType> {
// For ByteArray / FixedLenByteArray data. Not owned
ChunkedAllocator* pool_;
- ::arrow::CpuInfo* cpu_info_;
+ ::arrow::internal::CpuInfo* cpu_info_;
/// Size of the table. Must be a power of 2.
int hash_table_size_;
@@ -791,7 +791,7 @@ inline int DictEncoder<DType>::WriteIndices(uint8_t* buffer, int buffer_len) {
++buffer;
--buffer_len;
- ::arrow::RleEncoder encoder(buffer, buffer_len, bit_width());
+ ::arrow::util::RleEncoder encoder(buffer, buffer_len, bit_width());
for (int index : buffered_indices_) {
if (!encoder.Put(index)) return -1;
}
@@ -819,7 +819,7 @@ class DeltaBitPackDecoder : public Decoder<DType> {
virtual void SetData(int num_values, const uint8_t* data, int len) {
num_values_ = num_values;
- decoder_ = ::arrow::BitReader(data, len);
+ decoder_ = BitUtil::BitReader(data, len);
values_current_block_ = 0;
values_current_mini_block_ = 0;
}
@@ -885,7 +885,7 @@ class DeltaBitPackDecoder : public Decoder<DType> {
}
::arrow::MemoryPool* pool_;
- ::arrow::BitReader decoder_;
+ BitUtil::BitReader decoder_;
int32_t values_current_block_;
int32_t num_mini_blocks_;
uint64_t values_per_mini_block_;
diff --git a/cpp/src/parquet/file-deserialize-test.cc b/cpp/src/parquet/file-deserialize-test.cc
index b766eedf5..17dfe387f 100644
--- a/cpp/src/parquet/file-deserialize-test.cc
+++ b/cpp/src/parquet/file-deserialize-test.cc
@@ -17,16 +17,11 @@
#include <gtest/gtest.h>
-#include <algorithm>
#include <cstdint>
-#include <cstdlib>
#include <cstring>
-#include <exception>
#include <memory>
-#include <string>
-#include <vector>
-#include "parquet/column_reader.h"
+#include "parquet/column_page.h"
#include "parquet/exception.h"
#include "parquet/file_reader.h"
#include "parquet/thrift.h"
@@ -34,6 +29,8 @@
#include "parquet/util/memory.h"
#include "parquet/util/test-common.h"
+#include "arrow/io/memory.h"
+#include "arrow/status.h"
#include "arrow/util/compression.h"
namespace parquet {
@@ -196,7 +193,7 @@ TEST_F(TestPageSerde, Compression) {
test::random_bytes(page_size, 0, &faux_data[i]);
}
for (auto codec_type : codec_types) {
- std::unique_ptr<::arrow::Codec> codec = GetCodecFromArrow(codec_type);
+ auto codec = GetCodecFromArrow(codec_type);
std::vector<uint8_t> buffer;
for (int i = 0; i < num_pages; ++i) {
diff --git a/cpp/src/parquet/file_reader.cc b/cpp/src/parquet/file_reader.cc
index ea518fd98..5be1a8623 100644
--- a/cpp/src/parquet/file_reader.cc
+++ b/cpp/src/parquet/file_reader.cc
@@ -41,12 +41,6 @@
using std::string;
-namespace arrow {
-
-class Codec;
-
-} // namespace arrow
-
namespace parquet {
// PARQUET-978: Minimize footer reads by reading 64 KB from the end of the file
diff --git a/cpp/src/parquet/metadata-test.cc b/cpp/src/parquet/metadata-test.cc
index 53653bd78..bcf911eab 100644
--- a/cpp/src/parquet/metadata-test.cc
+++ b/cpp/src/parquet/metadata-test.cc
@@ -16,9 +16,12 @@
// under the License.
#include "parquet/metadata.h"
+
#include <gtest/gtest.h>
+
#include "parquet/schema.h"
#include "parquet/statistics.h"
+#include "parquet/thrift.h"
#include "parquet/types.h"
namespace parquet {
@@ -219,12 +222,36 @@ TEST(ApplicationVersion, Basics) {
ASSERT_EQ(true, version.VersionLt(version1));
- ASSERT_FALSE(version1.HasCorrectStatistics(Type::INT96, SortOrder::UNKNOWN));
- ASSERT_TRUE(version.HasCorrectStatistics(Type::INT32, SortOrder::SIGNED));
- ASSERT_FALSE(version.HasCorrectStatistics(Type::BYTE_ARRAY, SortOrder::SIGNED));
- ASSERT_TRUE(version1.HasCorrectStatistics(Type::BYTE_ARRAY, SortOrder::SIGNED));
+ EncodedStatistics stats;
+ ASSERT_FALSE(version1.HasCorrectStatistics(Type::INT96, stats, SortOrder::UNKNOWN));
+ ASSERT_TRUE(version.HasCorrectStatistics(Type::INT32, stats, SortOrder::SIGNED));
+ ASSERT_FALSE(version.HasCorrectStatistics(Type::BYTE_ARRAY, stats, SortOrder::SIGNED));
+ ASSERT_TRUE(version1.HasCorrectStatistics(Type::BYTE_ARRAY, stats, SortOrder::SIGNED));
+ ASSERT_FALSE(
+ version1.HasCorrectStatistics(Type::BYTE_ARRAY, stats, SortOrder::UNSIGNED));
+ ASSERT_TRUE(version3.HasCorrectStatistics(Type::FIXED_LEN_BYTE_ARRAY, stats,
+ SortOrder::SIGNED));
+
+ // Check that the old stats are correct if min and max are the same
+ // regardless of sort order
+ EncodedStatistics stats_str;
+ stats_str.set_min("a").set_max("b");
+ ASSERT_FALSE(
+ version1.HasCorrectStatistics(Type::BYTE_ARRAY, stats_str, SortOrder::UNSIGNED));
+ stats_str.set_max("a");
+ ASSERT_TRUE(
+ version1.HasCorrectStatistics(Type::BYTE_ARRAY, stats_str, SortOrder::UNSIGNED));
+
+ // Check that the same holds true for ints
+ int32_t int_min = 100, int_max = 200;
+ EncodedStatistics stats_int;
+ stats_int.set_min(std::string(reinterpret_cast<const char*>(&int_min), 4))
+ .set_max(std::string(reinterpret_cast<const char*>(&int_max), 4));
+ ASSERT_FALSE(
+ version1.HasCorrectStatistics(Type::BYTE_ARRAY, stats_int, SortOrder::UNSIGNED));
+ stats_int.set_max(std::string(reinterpret_cast<const char*>(&int_min), 4));
ASSERT_TRUE(
- version3.HasCorrectStatistics(Type::FIXED_LEN_BYTE_ARRAY, SortOrder::SIGNED));
+ version1.HasCorrectStatistics(Type::BYTE_ARRAY, stats_int, SortOrder::UNSIGNED));
}
} // namespace metadata
diff --git a/cpp/src/parquet/metadata.cc b/cpp/src/parquet/metadata.cc
index 9c66c7aab..f49393b60 100644
--- a/cpp/src/parquet/metadata.cc
+++ b/cpp/src/parquet/metadata.cc
@@ -55,7 +55,8 @@ static std::shared_ptr<RowGroupStatistics> MakeTypedColumnStats(
return std::make_shared<TypedRowGroupStatistics<DType>>(
descr, metadata.statistics.min_value, metadata.statistics.max_value,
metadata.num_values - metadata.statistics.null_count,
- metadata.statistics.null_count, metadata.statistics.distinct_count, true);
+ metadata.statistics.null_count, metadata.statistics.distinct_count,
+ metadata.statistics.__isset.max_value || metadata.statistics.__isset.min_value);
}
// Default behavior
return std::make_shared<TypedRowGroupStatistics<DType>>(
@@ -100,7 +101,7 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl {
for (auto encoding : meta_data.encodings) {
encodings_.push_back(FromThrift(encoding));
}
- stats_ = nullptr;
+ possible_stats_ = nullptr;
}
~ColumnChunkMetaDataImpl() {}
@@ -125,15 +126,22 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl {
// Eg: UTF8
inline bool is_stats_set() const {
DCHECK(writer_version_ != nullptr);
- return column_->meta_data.__isset.statistics &&
- writer_version_->HasCorrectStatistics(type(), descr_->sort_order());
+ // If the column statistics don't exist or column sort order is unknown
+ // we cannot use the column stats
+ if (!column_->meta_data.__isset.statistics ||
+ descr_->sort_order() == SortOrder::UNKNOWN) {
+ return false;
+ }
+ if (possible_stats_ == nullptr) {
+ possible_stats_ = MakeColumnStats(column_->meta_data, descr_);
+ }
+ EncodedStatistics encodedStatistics = possible_stats_->Encode();
+ return writer_version_->HasCorrectStatistics(type(), encodedStatistics,
+ descr_->sort_order());
}
inline std::shared_ptr<RowGroupStatistics> statistics() const {
- if (stats_ == nullptr && is_stats_set()) {
- stats_ = MakeColumnStats(column_->meta_data, descr_);
- }
- return stats_;
+ return is_stats_set() ? possible_stats_ : nullptr;
}
inline Compression::type compression() const {
@@ -169,7 +177,7 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl {
}
private:
- mutable std::shared_ptr<RowGroupStatistics> stats_;
+ mutable std::shared_ptr<RowGroupStatistics> possible_stats_;
std::vector<Encoding::type> encodings_;
const format::ColumnChunk* column_;
const ColumnDescriptor* descr_;
@@ -530,11 +538,16 @@ bool ApplicationVersion::VersionEq(const ApplicationVersion& other_version) cons
// parquet-mr/parquet-column/src/main/java/org/apache/parquet/CorruptStatistics.java
// PARQUET-686 has more disussion on statistics
bool ApplicationVersion::HasCorrectStatistics(Type::type col_type,
+ EncodedStatistics& statistics,
SortOrder::type sort_order) const {
// Parquet cpp version 1.3.0 onwards stats are computed correctly for all types
if ((application_ != "parquet-cpp") || (VersionLt(PARQUET_CPP_FIXED_STATS_VERSION()))) {
- // Only SIGNED are valid
- if (SortOrder::SIGNED != sort_order) {
+ // Only SIGNED are valid unless max and min are the same
+ // (in which case the sort order does not matter)
+ bool max_equals_min = statistics.has_min && statistics.has_max
+ ? statistics.min() == statistics.max()
+ : false;
+ if (SortOrder::SIGNED != sort_order && !max_equals_min) {
return false;
}
diff --git a/cpp/src/parquet/metadata.h b/cpp/src/parquet/metadata.h
index 79f4fdb35..7e29fe91a 100644
--- a/cpp/src/parquet/metadata.h
+++ b/cpp/src/parquet/metadata.h
@@ -85,7 +85,7 @@ class ApplicationVersion {
bool VersionEq(const ApplicationVersion& other_version) const;
// Checks if the Version has the correct statistics for a given column
- bool HasCorrectStatistics(Type::type primitive,
+ bool HasCorrectStatistics(Type::type primitive, EncodedStatistics& statistics,
SortOrder::type sort_order = SortOrder::SIGNED) const;
};
diff --git a/cpp/src/parquet/public-api-test.cc b/cpp/src/parquet/public-api-test.cc
index 958e97016..c0ef97a70 100644
--- a/cpp/src/parquet/public-api-test.cc
+++ b/cpp/src/parquet/public-api-test.cc
@@ -17,10 +17,10 @@
#include <gtest/gtest.h>
-#include "parquet/api/io.h"
-#include "parquet/api/reader.h"
-#include "parquet/api/schema.h"
-#include "parquet/api/writer.h"
+#include "parquet/api/io.h" // IWYU pragma: keep
+#include "parquet/api/reader.h" // IWYU pragma: keep
+#include "parquet/api/schema.h" // IWYU pragma: keep
+#include "parquet/api/writer.h" // IWYU pragma: keep
TEST(TestPublicAPI, DoesNotIncludeThrift) {
#ifdef _THRIFT_THRIFT_H_
diff --git a/cpp/src/parquet/thrift.h b/cpp/src/parquet/thrift.h
index 217cc76c0..9c665acfa 100644
--- a/cpp/src/parquet/thrift.h
+++ b/cpp/src/parquet/thrift.h
@@ -44,7 +44,7 @@
#include "parquet/exception.h"
#include "parquet/util/memory.h"
-#include "parquet/parquet_types.h"
+#include "parquet/parquet_types.h" // IYWU pragma: export
namespace parquet {
diff --git a/cpp/src/parquet/util/memory.cc b/cpp/src/parquet/util/memory.cc
index 5c76cd8a6..fde424aaf 100644
--- a/cpp/src/parquet/util/memory.cc
+++ b/cpp/src/parquet/util/memory.cc
@@ -32,31 +32,32 @@
#include "parquet/types.h"
using arrow::MemoryPool;
+using arrow::util::Codec;
namespace parquet {
-std::unique_ptr<::arrow::Codec> GetCodecFromArrow(Compression::type codec) {
- std::unique_ptr<::arrow::Codec> result;
+std::unique_ptr<Codec> GetCodecFromArrow(Compression::type codec) {
+ std::unique_ptr<Codec> result;
switch (codec) {
case Compression::UNCOMPRESSED:
break;
case Compression::SNAPPY:
- PARQUET_THROW_NOT_OK(::arrow::Codec::Create(::arrow::Compression::SNAPPY, &result));
+ PARQUET_THROW_NOT_OK(Codec::Create(::arrow::Compression::SNAPPY, &result));
break;
case Compression::GZIP:
- PARQUET_THROW_NOT_OK(::arrow::Codec::Create(::arrow::Compression::GZIP, &result));
+ PARQUET_THROW_NOT_OK(Codec::Create(::arrow::Compression::GZIP, &result));
break;
case Compression::LZO:
- PARQUET_THROW_NOT_OK(::arrow::Codec::Create(::arrow::Compression::LZO, &result));
+ PARQUET_THROW_NOT_OK(Codec::Create(::arrow::Compression::LZO, &result));
break;
case Compression::BROTLI:
- PARQUET_THROW_NOT_OK(::arrow::Codec::Create(::arrow::Compression::BROTLI, &result));
+ PARQUET_THROW_NOT_OK(Codec::Create(::arrow::Compression::BROTLI, &result));
break;
case Compression::LZ4:
- PARQUET_THROW_NOT_OK(::arrow::Codec::Create(::arrow::Compression::LZ4, &result));
+ PARQUET_THROW_NOT_OK(Codec::Create(::arrow::Compression::LZ4, &result));
break;
case Compression::ZSTD:
- PARQUET_THROW_NOT_OK(::arrow::Codec::Create(::arrow::Compression::ZSTD, &result));
+ PARQUET_THROW_NOT_OK(Codec::Create(::arrow::Compression::ZSTD, &result));
break;
default:
break;
diff --git a/cpp/src/parquet/util/memory.h b/cpp/src/parquet/util/memory.h
index 2eadb3326..cccafe8cb 100644
--- a/cpp/src/parquet/util/memory.h
+++ b/cpp/src/parquet/util/memory.h
@@ -37,15 +37,17 @@
#include "parquet/util/visibility.h"
namespace arrow {
+namespace util {
class Codec;
+} // namespace util
} // namespace arrow
namespace parquet {
PARQUET_EXPORT
-std::unique_ptr<::arrow::Codec> GetCodecFromArrow(Compression::type codec);
+std::unique_ptr<::arrow::util::Codec> GetCodecFromArrow(Compression::type codec);
static constexpr int64_t kInMemoryDefaultCapacity = 1024;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment