Skip to content

Instantly share code, notes, and snippets.

@calvinmetcalf
Created January 13, 2015 15:35
Show Gist options
  • Save calvinmetcalf/161db5bd9a8679886caa to your computer and use it in GitHub Desktop.
Save calvinmetcalf/161db5bd9a8679886caa to your computer and use it in GitHub Desktop.
whatwg readable streams translated to es5
var Promise = global.Promise || require('lie');
function noop(){}
function typeIsObject(x) {
return (typeof x === 'object' && x !== null) || typeof x === 'function';
}
function ExclusiveStreamReader () {
throw new Error('not implimented');
}
function ReadableStream(opts) {
var start = opts.start || noop;
this._onPull = opts.pull || noop;
this._onCancel = opts.cancel || noop;
this._strategy = opts.strategy || defaultReadableStreamStrategy;
var self = this;
this._readyPromise = new Promise(function (resolve) {
self._resolveReadyPromise = resolve;
});
this.closed = new Promise(function (resolve, reject) {
self._resolveClosedPromise = resolve;
self._rejectClosedPromise = reject;
});
this._queue = [];
this._state = 'waiting';
this._started = false;
this._draining = false;
this._pulling = false;
this._reader = undefined;
this._enqueue = function (chunk) {
return self.__enqueue(chunk);
};
this._close = function () {
return self.__close();
};
this._error = function (e) {
return self.__error(e);
};
Promise.resolve(start(this._enqueue, this._close, this._error)).then(
function () {
self._started = true;
callReadableStreamPull(this);
},
function (r) {
return self._error(r);
}
);
}
ReadableStream.prototype.__error = function (e) {
var stream = this;
if (stream._state === 'waiting') {
stream._resolveReadyPromise();
}
if (stream._state === 'readable') {
stream._queue = [];
}
if (stream._state === 'waiting' || stream._state === 'readable') {
stream._state = 'errored';
stream._storedError = e;
stream._rejectClosedPromise(e);
if (stream._reader !== undefined) {
stream._reader.releaseLock();
}
}
};
ReadableStream.prototype.__close = function () {
var stream = this;
if (stream._state === 'waiting') {
stream._resolveReadyPromise();
return closeReadableStream(stream);
}
if (stream._state === 'readable') {
stream._draining = true;
}
};
ReadableStream.prototype.__enqueue = function (chunk) {
var stream = this;
if (stream._state === 'errored') {
throw stream._storedError;
}
if (stream._state === 'closed') {
throw new TypeError('stream is closed');
}
if (stream._draining === true) {
throw new TypeError('stream is draining');
}
var chunkSize;
try {
chunkSize = stream._strategy.size(chunk);
} catch (chunkSizeE) {
stream._error(chunkSizeE);
throw chunkSizeE;
}
enqueueValueWithSize(stream._queue, chunk, chunkSize);
stream._pulling = false;
var shouldApplyBackpressure = shouldReadableStreamApplyBackpressure(stream);
if (stream._state === 'waiting') {
stream._state = 'readable';
stream._resolveReadyPromise();
}
if (shouldApplyBackpressure === true) {
return false;
}
return true;
};
ReadableStream.prototype.cancel = function (reason) {
if (this._reader !== undefined) {
return Promise.reject(
new TypeError('This stream is locked to a single exclusive reader and cannot be cancelled directly'));
}
if (this._state === 'closed') {
return Promise.resolve();
}
if (this._state === 'errored') {
return Promise.reject(this._storedError);
}
if (this._state === 'waiting') {
this._resolveReadyPromise();
}
this._queue = [];
closeReadableStream(this);
var self = this;
var sourceCancelPromise = new Promise(function (resolve) {
resolve(self._onCancel(reason));
});
return sourceCancelPromise.then(noop);
};
ReadableStream.prototype.getReader = function() {
if (this._state === 'closed') {
throw new TypeError('The stream has already been closed, so a reader cannot be acquired.');
}
if (this._state === 'errored') {
throw this._storedError;
}
return new ExclusiveStreamReader(this);
};
ReadableStream.prototype.pipeThrough = function(duplex, options) {
if (!typeIsObject(duplex.writable)) {
throw new TypeError('A transform stream must have an writable property that is an object.');
}
if (!typeIsObject(duplex.readable)) {
throw new TypeError('A transform stream must have a readable property that is an object.');
}
this.pipeTo(duplex.writable, options);
return duplex.readable;
};
ReadableStream.prototype.read = function () {
if (this._reader !== undefined) {
throw new TypeError('This stream is locked to a single exclusive reader and cannot be read from directly');
}
if (this._state === 'waiting') {
throw new TypeError('no chunks available (yet)');
}
if (this._state === 'closed') {
throw new TypeError('stream has already been consumed');
}
if (this._state === 'errored') {
throw this._storedError;
}
if (this._state !== 'readable') {
throw new Error('stream state ' + this._state + ' is invalid');
}
if (this._queue.length <= 0) {
throw new Error('there must be chunks available to read');
}
var chunk = dequeueValue(this._queue);
if (this._queue.length === 0) {
if (this._draining === true) {
closeReadableStream(this);
} else {
this._state = 'waiting';
this._initReadyPromise();
}
}
callReadableStreamPull(this);
return chunk;
};
ReadableStream.prototype.pipeTo = function (dest, opts) {
opts = opts || {};
var preventClose = opts.preventClose;
var preventAbort = opts.preventAbort;
var preventCancel = opts.preventCancel;
var source;
var resolvePipeToPromise;
var rejectPipeToPromise;
return new Promise(function (resolve, reject) {
resolvePipeToPromise = resolve;
rejectPipeToPromise = reject;
source = this.getReader();
doPipe();
});
function doPipe() {
var ds = dest.state;
switch (ds) {
case 'writable':
if (source.state === 'readable') {
while (source.state === 'readable' && dest.state === 'writable') {
dest.write(source.read());
}
Promise.resolve(doPipe);
}
if (source.state === 'waiting') {
Promise.race([source.ready, dest.closed]).then(doPipe, doPipe);
} else if (source.state === 'errored') {
source.closed.catch(abortDest);
} else if (source.state === 'closed') {
closeDest();
}
return;
case 'waiting':
if (source.state === 'readable') {
Promise.race([source.closed, dest.ready]).then(doPipe, doPipe);
} else if (source.state === 'waiting') {
Promise.race([source.ready, dest.ready]).then(doPipe);
} else if (source.state === 'errored') {
source.closed.catch(abortDest);
} else if (source.state === 'closed') {
closeDest();
}
return;
case 'errored':
if (source.state === 'readable' || source.state === 'waiting') {
dest.closed.catch(cancelSource);
}
return;
case 'closing':
case 'closed':
if (source.state === 'readable' || source.state === 'waiting') {
cancelSource(new TypeError('destination is closing or closed and cannot be piped to anymore'));
}
return;
}
}
function cancelSource(reason) {
if (!preventCancel) {
// implicitly releases the lock
source.cancel(reason);
} else {
source.releaseLock();
}
rejectPipeToPromise(reason);
}
function closeDest() {
source.releaseLock();
if (!preventClose) {
dest.close().then(resolvePipeToPromise, rejectPipeToPromise);
} else {
resolvePipeToPromise();
}
}
function abortDest(reason) {
source.releaseLock();
if (!preventAbort) {
dest.abort(reason);
}
rejectPipeToPromise(reason);
}
};
Object.defineProperty(ReadableStream.prototype, 'ready', {
get: function () {
if (this._reader !== undefined) {
return this._reader._lockReleased;
}
return this._readyPromise;
},
emumerable: true
});
Object.defineProperty(ReadableStream.prototype, 'state', {
get: function () {
if (this._reader !== undefined) {
return 'waiting';
}
return this._state;
},
emumerable: true
});
var defaultReadableStreamStrategy = {
shouldApplyBackpressure: function (queueSize) {
if (typeof queueSize !== 'number' || queueSize !== queueSize) {
throw new Error('invalid queue size');
}
return queueSize > 1;
},
size: function () {
return 1;
}
};
function callReadableStreamPull(stream) {
if (stream._pulling === true || stream._draining === true || stream._started === false ||
stream._state === 'closed' || stream._state === 'errored') {
return;
}
var shouldApplyBackpressure = shouldReadableStreamApplyBackpressure(stream);
if (shouldApplyBackpressure) {
return;
}
stream._pulling = true;
try {
stream._onPull(
stream._enqueue,
stream._close,
stream._error
);
} catch (pullResultE) {
stream._error(pullResultE);
throw pullResultE;
}
}
function shouldReadableStreamApplyBackpressure(stream) {
var queueSize = getTotalQueueSize(stream._queue);
try {
return stream._strategy.shouldApplyBackpressure(queueSize);
} catch (shouldApplyBackpressureE) {
stream._error(shouldApplyBackpressureE);
throw shouldApplyBackpressureE;
}
}
function getTotalQueueSize(queue) {
return queue.reduce(function (total, pair) {
if(!(typeof pair.size === 'number' && !Number.isNaN(pair.size) &&
pair.size !== +Infinity && pair.size !== -Infinity)) {
throw new Error('Spec-level failure: should never find an invalid size in the queue.');
}
return total + pair.size;
}, 0);
}
function dequeueValue(queue) {
if(queue.length <= 0) {
throw new Error('Spec-level failure: should never dequeue from an empty queue.');
}
var pair = queue.shift();
return pair.value;
}
function callReadableStreamPull(stream) {
if (stream._pulling === true || stream._draining === true || stream._started === false ||
stream._state === 'closed' || stream._state === 'errored') {
return;
}
var shouldApplyBackpressure = shouldReadableStreamApplyBackpressure(stream);
if (shouldApplyBackpressure === true) {
return;
}
stream._pulling = true;
try {
stream._onPull(
stream._enqueue,
stream._close,
stream._error
);
} catch (pullResultE) {
stream._error(pullResultE);
throw pullResultE;
}
}
export function enqueueValueWithSize(queue, value, size) {
size = Number(size);
if (size !== size || size === +Infinity || size === -Infinity) {
throw new RangeError('Size must be a finite, non-NaN number.');
}
queue.push({ value: value, size: size });
}
function closeReadableStream(stream) {
stream._state = 'closed';
stream._resolveClosedPromise();
if (stream._reader !== undefined) {
stream._reader.releaseLock();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment