Sample Avro LogicalType
implementations.
Last active
August 9, 2023 20:59
-
-
Save mtth/1aec40375fbcb077aee7 to your computer and use it in GitHub Desktop.
Avro logical types
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* jshint node: true */ | |
'use strict'; | |
const avro = require('avsc'), | |
util = require('util'); | |
/** | |
* Custom logical type used to encode native Date objects as longs. | |
* | |
* It also supports reading dates serialized as strings (by creating an | |
* appropriate resolver). | |
* | |
*/ | |
class DateType extends avro.types.LogicalType { | |
_fromValue(val) { | |
return new Date(val); | |
} | |
_toValue(date) { | |
return date instanceof Date ? +date : undefined; | |
} | |
_resolve(type) { | |
if (avro.Type.isType(type, 'long', 'string', 'logical:timestamp-millis')) { | |
return this._fromValue; | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* jshint node: true */ | |
'use strict'; | |
var avro = require('avsc'), | |
util = require('util'); | |
/** | |
* Sample decimal logical type implementation. | |
* | |
* It wraps its values in a very simple custom `Decimal` class. | |
* | |
*/ | |
function DecimalType(attrs, opts) { | |
avro.types.LogicalType.call(this, attrs, opts); | |
var precision = attrs.precision; | |
if (precision !== (precision | 0) || precision <= 0) { | |
throw new Error('invalid precision'); | |
} | |
var scale = attrs.scale; | |
if (scale !== (scale | 0) || scale < 0 || scale > precision) { | |
throw new Error('invalid scale'); | |
} | |
var type = this.underlyingType; | |
if (avro.Type.isType(type, 'fixed')) { | |
var size = type.size; | |
var maxPrecision = Math.log(Math.pow(2, 8 * size - 1) - 1) / Math.log(10); | |
if (precision > (maxPrecision | 0)) { | |
throw new Error('fixed size too small to hold required precision'); | |
} | |
} | |
this.Decimal = Decimal; | |
function Decimal(unscaled) { this.unscaled = unscaled; } | |
Decimal.prototype.precision = precision; | |
Decimal.prototype.scale = scale; | |
Decimal.prototype.toNumber = function () { | |
return this.unscaled * Math.pow(10, -scale); | |
}; | |
} | |
util.inherits(DecimalType, avro.types.LogicalType); | |
DecimalType.prototype._fromValue = function (buf) { | |
return new this.Decimal(buf.readIntBE(0, buf.length)); | |
}; | |
DecimalType.prototype._toValue = function (dec) { | |
if (!(dec instanceof this.Decimal)) { | |
throw new Error('invalid decimal'); | |
} | |
var type = this.underlyingType; | |
var buf; | |
if (avro.Type.isType(type, 'fixed')) { | |
buf = new Buffer(type.size); | |
} else { | |
var size = Math.log(dec > 0 ? dec : - 2 * dec) / (Math.log(2) * 8) | 0; | |
buf = new Buffer(size + 1); | |
} | |
buf.writeIntBE(dec.unscaled, 0, buf.length); | |
return buf; | |
}; | |
DecimalType.prototype._resolve = function (type) { | |
if ( | |
avro.Type.isType(type, 'logical:decimal') && | |
type.Decimal.prototype.precision === this.Decimal.prototype.precision && | |
type.Decimal.prototype.scale === this.Decimal.prototype.scale | |
) { | |
return function (dec) { return dec; }; | |
} | |
}; | |
DecimalType.prototype._export = function (attrs) { | |
attrs.precision = this.Decimal.prototype.precision; | |
attrs.scale = this.Decimal.prototype.scale; | |
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* jshint node: true */ | |
'use strict'; | |
var avro = require('avsc'), | |
util = require('util'); | |
/** | |
* A basic optional type. | |
* | |
* It assumes an underlying union of the form `["null", ???]`. | |
* | |
* Enhancements include: | |
* | |
* + Performing a check in the constructor on the underlying type (i.e. | |
* union with the correct form). | |
* + Code-generating the conversion methods (especially a constructor | |
* for `_toValue`). | |
* | |
* Note that since unions do not support annotations, this logical type must be | |
* used via a custom `typeHook` (rather than via the `logicalTypes` option). | |
* | |
*/ | |
function OptionalType(attrs, opts) { | |
avro.types.LogicalType.call(this, attrs, opts); | |
this._name = this.underlyingType.types[1].branchName ; | |
} | |
util.inherits(OptionalType, avro.types.LogicalType); | |
OptionalType.prototype._fromValue = function (val) { | |
return val === null ? null : val[this._name]; | |
}; | |
OptionalType.prototype._toValue = function (any) { | |
if (any === null) { | |
return null; | |
} | |
var obj = {}; | |
obj[this._name] = any; | |
return obj; | |
}; |
The DecimalType should also allocate the buffer size according to the unscaled value, not the scaled value:
DecimalType.prototype._toValue = function (dec) {
if (!(dec instanceof this.Decimal)) {
throw new Error('invalid decimal');
}
var type = this.underlyingType;
var unscaled = dec.unscaled;
var buf;
if (avro.Type.isType(type, 'fixed')) {
buf = Buffer.alloc(type.size);
} else {
var size = Math.log(unscaled > 0 ? unscaled : - 2 * unscaled) / (Math.log(2) * 8) | 0;
buf = Buffer.alloc(size + 1);
}
buf.writeIntBE(unscaled, 0, buf.length);
return buf;
};
would it be possible to paste a helloworld nodejs code to use logicaltype decimal
for example, i am trying to generate avro from csv. the csv has decimals - for example, 10045.36. , as one of the columns.
But, not able to work my way through.
the date.js example works like a charm.
After playing around a bit, here is my version in Typescript. I've had to change the math formulas as I was getting some inaccurate results.
It seems to conform with lib's API but not sure if I missed sth. Any comments more than welcome
import * as avro from "avsc";
interface UnderLyingType {
[key: string]: any;
}
export class Decimal {
unscaled: number;
precision: number;
scale: number;
constructor(unscaled: number, precision: number, scale: number) {
this.unscaled = unscaled;
this.precision = precision;
this.scale = scale;
}
toNumber() {
// return this.unscaled * Math.pow(10, -this.scale); <= this was producing decimals like that 123.56000007 instead of 123.56
return this.unscaled / Math.pow(10, this.scale);
}
}
export class DecimalType extends avro.types.LogicalType {
decimal: Decimal;
constructor(attrs, opts) {
super(attrs, opts);
const precision = attrs.precision;
if (precision !== (Math.trunc(precision)) || precision <= 0) {
throw new Error("invalid precision");
}
const scale = attrs.scale;
if (scale !== (Math.trunc(scale)) || scale < 0 || scale > precision) {
throw new Error("invalid scale");
}
const type = this.underlyingType as UnderLyingType;
if (avro.Type.isType(type, "fixed")) {
const size = type.size;
const maxPrecision =
Math.log(Math.pow(2, 8 * size - 1) - 1) / Math.log(10);
if (precision > (Math.trunc(maxPrecision))) {
throw new Error("fixed size too small to hold required precision");
}
}
this.decimal = new Decimal(0, precision, scale);
}
_fromValue (buf) {
return new Decimal(buf.readIntBE(0, buf.length), this.decimal.precision, this.decimal.scale).toNumber();
}
_toValue(dec) {
if (!(dec instanceof Decimal)) {
// throw new TypeError("invalid decimal"); not sure what is the purpose of this check
console.log("creating new decimal");
const unscaled = Math.round(dec * Math.pow(10, this.decimal.scale));
dec = new Decimal(unscaled, this.decimal.precision, this.decimal.scale);
}
const type = this.underlyingType as UnderLyingType;
let buf: Buffer;
if (avro.Type.isType(type, "fixed")) {
buf = Buffer.alloc(type.size);
} else {
const absoluteValue = Math.abs(dec.unscaled);
const logBase2 = Math.log2(absoluteValue);
const numberOfBits = Math.floor(logBase2) + 2;
const numberOfBytes = Math.ceil(numberOfBits / 8);
buf = Buffer.alloc(numberOfBytes);
}
buf.writeIntBE(dec.unscaled, 0, buf.length);
return buf;
}
}
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
For
DecimalType
the calls tonew Buffer()
should be changed toBuffer.alloc()