Skip to content

Instantly share code, notes, and snippets.

@sadikovi
Created April 26, 2018 08:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sadikovi/a2a5d79f4e4368ea50a5b6b5a0e58cce to your computer and use it in GitHub Desktop.
Save sadikovi/a2a5d79f4e4368ea50a5b6b5a0e58cce to your computer and use it in GitHub Desktop.
Mutable Statistics Buffer (for collecting statistics during writes). For PR https://github.com/sunchao/parquet-rs/pull/94
// ----------------------------------------------------------------------
// Statistics updates
struct MutableStatisticsBuffer<T: DataType> {
typed: TypedStatistics<T>,
sort_order: SortOrder
}
impl<T: DataType> MutableStatisticsBuffer<T> {
pub fn new(column_order: ColumnOrder, is_min_max_deprecated: bool) -> Self {
Self {
typed: TypedStatistics::new(None, None, 0, is_min_max_deprecated),
sort_order: column_order.sort_order()
}
}
pub fn inc_null_count(&mut self) {
self.typed.add_null_count(1);
}
pub fn update_min_max(&mut self, value: T::T) {
self.typed.update_min_max(value, self.sort_order);
}
pub fn as_statistics(self) -> Statistics {
self.typed.as_statistics()
}
}
trait StatisticsUpdate<T: DataType> {
#[inline]
fn set_null_count(&mut self, nulls: u64);
#[inline]
fn add_null_count(&mut self, nulls: u64);
#[inline]
fn update_min_max_signed(&mut self, value: T::T);
#[inline]
fn update_min_max_unsigned(&mut self, value: T::T);
#[inline]
fn update_min_max(&mut self, value: T::T, sort_order: SortOrder) {
match sort_order {
SortOrder::SIGNED => self.update_min_max_signed(value),
SortOrder::UNSIGNED => self.update_min_max_unsigned(value),
SortOrder::UNDEFINED => panic!("Cannot update values with {} order", sort_order)
}
}
}
trait AsStatistics {
fn as_statistics(self) -> Statistics;
}
impl<T: DataType> StatisticsUpdate<T> for TypedStatistics<T> {
#[inline]
default fn set_null_count(&mut self, nulls: u64) {
self.null_count = nulls;
}
#[inline]
default fn add_null_count(&mut self, nulls: u64) {
self.null_count += nulls;
}
#[inline]
default fn update_min_max_signed(&mut self, _value: T::T) {
panic!("Unsupported signed min/max update");
}
#[inline]
default fn update_min_max_unsigned(&mut self, _value: T::T) {
panic!("Unsupported unsigned min/max update");
}
}
impl<T: DataType> AsStatistics for TypedStatistics<T> {
default fn as_statistics(self) -> Statistics {
panic!("Unsupported statistics conversion");
}
}
impl StatisticsUpdate<BoolType> for TypedStatistics<BoolType> {
#[inline]
fn update_min_max_signed(&mut self, value: bool) {
self.min = self.min.map(|min| bool::min(min, value)).or(Some(value));
self.max = self.max.map(|max| bool::max(max, value)).or(Some(value));
}
#[inline]
fn update_min_max_unsigned(&mut self, value: bool) {
self.min = self.min.map(|min| bool::min(min, value)).or(Some(value));
self.max = self.max.map(|max| bool::max(max, value)).or(Some(value));
}
}
impl AsStatistics for TypedStatistics<BoolType> {
fn as_statistics(self) -> Statistics {
Statistics::Boolean(self)
}
}
impl StatisticsUpdate<Int32Type> for TypedStatistics<Int32Type> {
#[inline]
fn update_min_max_signed(&mut self, value: i32) {
self.min = self.min.map(|min| i32::min(min, value)).or(Some(value));
self.max = self.max.map(|max| i32::max(max, value)).or(Some(value));
}
#[inline]
fn update_min_max_unsigned(&mut self, value: i32) {
self.min = self.min
.map(|min| u32::min(min as u32, value as u32) as i32)
.or(Some(value));
self.max = self.max
.map(|max| u32::max(max as u32, value as u32) as i32)
.or(Some(value));
}
}
impl AsStatistics for TypedStatistics<Int32Type> {
fn as_statistics(self) -> Statistics {
Statistics::Int32(self)
}
}
impl StatisticsUpdate<Int64Type> for TypedStatistics<Int64Type> {
#[inline]
fn update_min_max_signed(&mut self, value: i64) {
self.min = self.min.map(|min| i64::min(min, value)).or(Some(value));
self.max = self.max.map(|max| i64::max(max, value)).or(Some(value));
}
#[inline]
fn update_min_max_unsigned(&mut self, value: i64) {
self.min = self.min
.map(|min| u64::min(min as u64, value as u64) as i64)
.or(Some(value));
self.max = self.max
.map(|max| u64::max(max as u64, value as u64) as i64)
.or(Some(value));
}
}
impl AsStatistics for TypedStatistics<Int64Type> {
fn as_statistics(self) -> Statistics {
Statistics::Int64(self)
}
}
impl StatisticsUpdate<Int96Type> for TypedStatistics<Int96Type> {
#[inline]
fn update_min_max_signed(&mut self, _value: Int96) {
panic!("Int96 statistics update is not supported");
}
#[inline]
fn update_min_max_unsigned(&mut self, _value: Int96) {
panic!("Int96 statistics update is not supported");
}
}
impl AsStatistics for TypedStatistics<Int96Type> {
fn as_statistics(self) -> Statistics {
Statistics::Int96(self)
}
}
impl StatisticsUpdate<FloatType> for TypedStatistics<FloatType> {
#[inline]
fn update_min_max_signed(&mut self, value: f32) {
self.min = self.min.map(|min| f32::min(min, value)).or(Some(value));
self.max = self.max.map(|max| f32::max(max, value)).or(Some(value));
}
}
impl AsStatistics for TypedStatistics<FloatType> {
fn as_statistics(self) -> Statistics {
Statistics::Float(self)
}
}
impl StatisticsUpdate<DoubleType> for TypedStatistics<DoubleType> {
#[inline]
fn update_min_max_signed(&mut self, value: f64) {
self.min = self.min.map(|min| f64::min(min, value)).or(Some(value));
self.max = self.max.map(|max| f64::max(max, value)).or(Some(value));
}
}
impl AsStatistics for TypedStatistics<DoubleType> {
fn as_statistics(self) -> Statistics {
Statistics::Double(self)
}
}
impl StatisticsUpdate<ByteArrayType> for TypedStatistics<ByteArrayType> {
#[inline]
fn update_min_max_unsigned(&mut self, value: ByteArray) {
if self.min.is_none() || self.max.is_none() {
self.min = Some(value.clone());
self.max = Some(value.clone());
} else {
if self.min.as_ref().unwrap().data().cmp(value.data()) == cmp::Ordering::Greater {
self.min = Some(value.clone());
}
if self.max.as_ref().unwrap().data().cmp(value.data()) == cmp::Ordering::Less {
self.max = Some(value.clone());
}
}
}
fn update_min_max_signed(&mut self, value: ByteArray) {
// TODO: Used to compare decimals
unimplemented!();
}
}
impl AsStatistics for TypedStatistics<ByteArrayType> {
fn as_statistics(self) -> Statistics {
Statistics::ByteArray(self)
}
}
impl StatisticsUpdate<FixedLenByteArrayType> for TypedStatistics<FixedLenByteArrayType> {
#[inline]
fn update_min_max_unsigned(&mut self, value: ByteArray) {
// TODO
unimplemented!();
}
fn update_min_max_signed(&mut self, value: ByteArray) {
// TODO
unimplemented!();
}
}
impl AsStatistics for TypedStatistics<FixedLenByteArrayType> {
fn as_statistics(self) -> Statistics {
Statistics::FixedLenByteArray(self)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment