Skip to content

Instantly share code, notes, and snippets.

@mtth
Last active August 9, 2023 20:59
Show Gist options
  • Save mtth/1aec40375fbcb077aee7 to your computer and use it in GitHub Desktop.
Save mtth/1aec40375fbcb077aee7 to your computer and use it in GitHub Desktop.
Avro logical types
/* jshint node: true */
'use strict';
const avro = require('avsc'),
util = require('util');
/**
* Custom logical type used to encode native Date objects as longs.
*
* It also supports reading dates serialized as strings (by creating an
* appropriate resolver).
*
*/
class DateType extends avro.types.LogicalType {
_fromValue(val) {
return new Date(val);
}
_toValue(date) {
return date instanceof Date ? +date : undefined;
}
_resolve(type) {
if (avro.Type.isType(type, 'long', 'string', 'logical:timestamp-millis')) {
return this._fromValue;
}
}
}
/* jshint node: true */
'use strict';
var avro = require('avsc'),
util = require('util');
/**
* Sample decimal logical type implementation.
*
* It wraps its values in a very simple custom `Decimal` class.
*
*/
function DecimalType(attrs, opts) {
avro.types.LogicalType.call(this, attrs, opts);
var precision = attrs.precision;
if (precision !== (precision | 0) || precision <= 0) {
throw new Error('invalid precision');
}
var scale = attrs.scale;
if (scale !== (scale | 0) || scale < 0 || scale > precision) {
throw new Error('invalid scale');
}
var type = this.underlyingType;
if (avro.Type.isType(type, 'fixed')) {
var size = type.size;
var maxPrecision = Math.log(Math.pow(2, 8 * size - 1) - 1) / Math.log(10);
if (precision > (maxPrecision | 0)) {
throw new Error('fixed size too small to hold required precision');
}
}
this.Decimal = Decimal;
function Decimal(unscaled) { this.unscaled = unscaled; }
Decimal.prototype.precision = precision;
Decimal.prototype.scale = scale;
Decimal.prototype.toNumber = function () {
return this.unscaled * Math.pow(10, -scale);
};
}
util.inherits(DecimalType, avro.types.LogicalType);
DecimalType.prototype._fromValue = function (buf) {
return new this.Decimal(buf.readIntBE(0, buf.length));
};
DecimalType.prototype._toValue = function (dec) {
if (!(dec instanceof this.Decimal)) {
throw new Error('invalid decimal');
}
var type = this.underlyingType;
var buf;
if (avro.Type.isType(type, 'fixed')) {
buf = new Buffer(type.size);
} else {
var size = Math.log(dec > 0 ? dec : - 2 * dec) / (Math.log(2) * 8) | 0;
buf = new Buffer(size + 1);
}
buf.writeIntBE(dec.unscaled, 0, buf.length);
return buf;
};
DecimalType.prototype._resolve = function (type) {
if (
avro.Type.isType(type, 'logical:decimal') &&
type.Decimal.prototype.precision === this.Decimal.prototype.precision &&
type.Decimal.prototype.scale === this.Decimal.prototype.scale
) {
return function (dec) { return dec; };
}
};
DecimalType.prototype._export = function (attrs) {
attrs.precision = this.Decimal.prototype.precision;
attrs.scale = this.Decimal.prototype.scale;
};
/* jshint node: true */
'use strict';
var avro = require('avsc'),
util = require('util');
/**
* A basic optional type.
*
* It assumes an underlying union of the form `["null", ???]`.
*
* Enhancements include:
*
* + Performing a check in the constructor on the underlying type (i.e.
* union with the correct form).
* + Code-generating the conversion methods (especially a constructor
* for `_toValue`).
*
* Note that since unions do not support annotations, this logical type must be
* used via a custom `typeHook` (rather than via the `logicalTypes` option).
*
*/
function OptionalType(attrs, opts) {
avro.types.LogicalType.call(this, attrs, opts);
this._name = this.underlyingType.types[1].branchName ;
}
util.inherits(OptionalType, avro.types.LogicalType);
OptionalType.prototype._fromValue = function (val) {
return val === null ? null : val[this._name];
};
OptionalType.prototype._toValue = function (any) {
if (any === null) {
return null;
}
var obj = {};
obj[this._name] = any;
return obj;
};
@gurpreetatwal
Copy link

For DecimalType the calls to new Buffer() should be changed to Buffer.alloc()

@cmbuckley
Copy link

The DecimalType should also allocate the buffer size according to the unscaled value, not the scaled value:

DecimalType.prototype._toValue = function (dec) {
  if (!(dec instanceof this.Decimal)) {
    throw new Error('invalid decimal');
  }

  var type = this.underlyingType;
  var unscaled = dec.unscaled;
  var buf;

  if (avro.Type.isType(type, 'fixed')) {
    buf = Buffer.alloc(type.size);
  } else {
    var size = Math.log(unscaled > 0 ? unscaled : - 2 * unscaled) / (Math.log(2) * 8) | 0;
    buf = Buffer.alloc(size + 1);
  }

  buf.writeIntBE(unscaled, 0, buf.length);
  return buf;
};

@balaby25
Copy link

would it be possible to paste a helloworld nodejs code to use logicaltype decimal
for example, i am trying to generate avro from csv. the csv has decimals - for example, 10045.36. , as one of the columns.
But, not able to work my way through.
the date.js example works like a charm.

@capioc
Copy link

capioc commented Aug 9, 2023

After playing around a bit, here is my version in Typescript. I've had to change the math formulas as I was getting some inaccurate results.
It seems to conform with lib's API but not sure if I missed sth. Any comments more than welcome

import * as avro from "avsc";
interface UnderLyingType {
    [key: string]: any;
}
export class Decimal {
  unscaled: number;
  precision: number;
  scale: number;

  constructor(unscaled: number, precision: number, scale: number) {
    this.unscaled = unscaled;
    this.precision = precision;
    this.scale = scale;
  }
  toNumber() {
    // return this.unscaled * Math.pow(10, -this.scale); <= this was producing decimals like that 123.56000007 instead of 123.56
    return this.unscaled / Math.pow(10, this.scale);
  }
}

export class DecimalType extends avro.types.LogicalType {
    decimal: Decimal;
    
  constructor(attrs, opts) {
    super(attrs, opts);

    const precision = attrs.precision;
    if (precision !== (Math.trunc(precision)) || precision <= 0) {
      throw new Error("invalid precision");
    }
    const scale = attrs.scale;
    if (scale !== (Math.trunc(scale)) || scale < 0 || scale > precision) {
      throw new Error("invalid scale");
    }
    const type = this.underlyingType as UnderLyingType;
    if (avro.Type.isType(type, "fixed")) {
      const size = type.size;
      const maxPrecision =
        Math.log(Math.pow(2, 8 * size - 1) - 1) / Math.log(10);
      if (precision > (Math.trunc(maxPrecision))) {
        throw new Error("fixed size too small to hold required precision");
      }
    }
    this.decimal = new Decimal(0, precision, scale);
  }

  _fromValue (buf) {
    return new Decimal(buf.readIntBE(0, buf.length), this.decimal.precision, this.decimal.scale).toNumber();
  }

  _toValue(dec) {
    if (!(dec instanceof Decimal)) {
    //   throw new TypeError("invalid decimal"); not sure what is the purpose of this check
        console.log("creating new decimal");
        const unscaled = Math.round(dec * Math.pow(10, this.decimal.scale));
        dec = new Decimal(unscaled, this.decimal.precision, this.decimal.scale);
    }
  
    const type = this.underlyingType as UnderLyingType;
    let buf: Buffer;
    if (avro.Type.isType(type, "fixed")) {
      buf = Buffer.alloc(type.size);
    } else {
      const absoluteValue = Math.abs(dec.unscaled);
      const logBase2 = Math.log2(absoluteValue);
      const numberOfBits = Math.floor(logBase2) + 2;
      const numberOfBytes = Math.ceil(numberOfBits / 8);  
      buf = Buffer.alloc(numberOfBytes);
    }
    buf.writeIntBE(dec.unscaled, 0, buf.length);
    return buf;
  }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment