Skip to content

Instantly share code, notes, and snippets.

@koichik
Created October 18, 2011 10:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save koichik/1295138 to your computer and use it in GitHub Desktop.
Save koichik/1295138 to your computer and use it in GitHub Desktop.
string encoder/decoder as a filter
var stream = require('stream');
var util = require('util');
exports.utf8Decoder = function() {
return new Utf8Decoder();
};
exports.utf8Encoder = function() {
return new StringEncoder('utf8');
};
function Utf8Decoder() {
if (!(this instanceof Utf8Decoder)) return new Utf8Decoder();
stream.Stream.call(this);
this.readable = true;
this.writable = true;
this._pausing = false;
this._destroyed = false;
this._pending = '';
this._callbacks = [];
this._partialBuf = new Buffer(4);
this._partialLength = 0;
this._partialReceived = 0;
}
util.inherits(Utf8Decoder, stream.Stream);
Utf8Decoder.prototype.write = function write(buf, callback) {
if (!this.writable) {
throw new Error('not writable');
}
if (!Buffer.isBuffer(buf)) {
this.emit('error', new Error('not Buffer'));
return true;
}
var len = buf.length;
if (this._partialLength) {
var remain = this._partialLength - this._partialReceived;
if (len < remain) {
// partial char is still incomplete
buf.copy(this._partialBuf, this._partialReceived, 0, len);
this._partialReceived += len;
if (callback) {
this._callbacks.push(callback);
}
return !this._pausing || !this._pending;
}
// partial char has completed
buf.copy(this._partialBuf, this._partialReceived, 0, remain);
this._pending += this._partialBuf.toString('utf8', 0, this._partialLength);
this._partialLength = 0;
buf = buf.slice(remain, len);
len -= remain;
}
var end = len;
var count = numOfBytes = 0;
for (; end > 0; --end) {
var byte = buf[end - 1];
++count;
// 0xxx xxxx (single byte)
if (byte < 0x80) {
numOfBytes = 1;
break;
}
// 10xx xxxx (continuation bytes)
if (byte >> 6 === 0x02) {
continue;
}
// 110x xxxx (2 bytes)
if (byte >> 5 === 0x06) {
numOfBytes = 2;
break;
}
// 1110 xxxx (3 bytes)
if (byte >> 4 === 0x0E) {
numOfBytes = 3;
break;
}
// 1111 0xxx (4 bytes)
if (byte >> 3 === 0x1E) {
numOfBytes = 3;
break;
}
this.emit('error', new Error('illegal UTF-8 data ' + byte.toString(16)));
return true;
}
if (numOfBytes > 1) {
if (numOfBytes === count) {
end += count - 1;
} else {
--end;
buf.copy(this._partialBuf, 0, end, len);
this._partialLength = numOfBytes;
this._partialReceived = count;
}
}
if (end > 0) {
this._pending += buf.slice(0, end).toString('utf8');
}
if (!this._pausing && this._pending) {
this.emit('data', this._pending);
this._pending = '';
if (this.partialLengh && callback) {
this._callbacks.push(callback);
callback = undefined;
}
this._notify();
}
if (callback) {
this._callbacks.push(callback);
}
return !this._pausing || !this._pending;
};
Utf8Decoder.prototype._notify = function _notify() {
while (this._callbacks.length) {
var callback = this._callbacks.shift();
if (callback) {
process.nextTick(this._callbacks[i]);
}
}
};
Utf8Decoder.prototype.pause = function pause() {
this._pausing = true;
this.emit('pause');
}
Utf8Decoder.prototype.resume = function resume() {
this._pausing = false;
this.emit('resume');
if (!this._pausing && this._pending) {
this.emit('data', this._pending);
this._pending = '';
this._notify();
if (!this.writable) {
this.readable = false;
this.emit('end');
}
this.emit('drain');
}
}
Utf8Decoder.prototype.end = function end(buf, callback) {
if (buf) {
this.write(buf, callback);
}
if (this.writable) {
this.writable = false;
if (this.readable) {
this.readable = false;
this.emit('end');
}
}
return !this._pausing || !this._pending;
};
Utf8Decoder.prototype.destroy = function destroy() {
if (!this._destroyed) {
this._destroyed = true;
this.end();
this.emit('destroy');
}
}
function StringEncoder(encoding) {
if (!(this instanceof StringEncoder)) return new StringEncoder();
stream.Stream.call(this);
this.readable = true;
this.writable = true;
this._pausing = false;
this._destroyed = false;
this._encoding = encoding;
this._pendings = [];
this._callbacks = [];
}
util.inherits(StringEncoder, stream.Stream);
StringEncoder.prototype.write = function write(s, callback) {
if (!this.writable) {
throw new Error('not writable');
}
if (this._pausing) {
this._pendings.push(s);
this._callbacks.push(callback);
return false;
}
this.emit('data', new Buffer(s, this._encoding));
if (callback) {
process.nextTick(callback);
}
return true;
}
StringEncoder.prototype.pause = function pause() {
this._pausing = true;
}
StringEncoder.prototype.resume = function resume() {
this._pausing = false;
this.emit('resume');
if (!this._pausing && this._pendings.length > 0) {
while (!this._pausing && this._pendings.length > 0) {
var s = this._pendings.shift();
var callback = this._callbacks.shift();
this.emit('data', new Buffer(s, 'utf8'));
if (callback) {
process.nextTick(callback);
}
}
if (!this.writable && this._pendings.length === 0) {
this.readable = false;
this.emit('end');
}
if (this._pendings.length === 0) {
this.emit('drain');
}
}
}
StringEncoder.prototype.end = function end(s, callback) {
if (s) {
this.write(s, callback);
}
if (this.writable) {
this.writable = false;
if (this.readable) {
this.readable = false;
this.emit('end');
}
}
return this._pendings.length === 0;
}
StringEncoder.prototype.destroy = function destroy() {
if (!this._destroyed) {
this._destroyed = true;
this.end();
this.emit('destroy');
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment