Skip to content

Instantly share code, notes, and snippets.

@domenic
Created April 28, 2015 14:36
Show Gist options
  • Save domenic/2fbc9719d72316b04166 to your computer and use it in GitHub Desktop.
Save domenic/2fbc9719d72316b04166 to your computer and use it in GitHub Desktop.
ReadableStream.js
(function() {
'use strict';
const readableStreamClosedPromise = %CreatePrivateOwnSymbol('[[closedPromise]]');
const readableStreamCloseRequested = %CreatePrivateOwnSymbol('[[closeRequested]]');
const readableStreamController = %CreatePrivateOwnSymbol('[[controller]]');
const readableStreamPullAgain = %CreatePrivateOwnSymbol('[[pullAgain]]');
const readableStreamPulling = %CreatePrivateOwnSymbol('[[pulling]]');
const readableStreamQueue = %CreatePrivateOwnSymbol('[[queue]]');
const readableStreamReader = %CreatePrivateOwnSymbol('[[reader]]');
const readableStreamStarted = %CreatePrivateOwnSymbol('[[started]]');
const readableStreamState = %CreatePrivateOwnSymbol('[[state]]');
const readableStreamStoredError = %CreatePrivateOwnSymbol('[[storedError]]');
const readableStreamStrategySize = %CreatePrivateOwnSymbol('[[strategySize]]');
const readableStreamStrategyHWM = %CreatePrivateOwnSymbol('[[strategyHWM]]');
const readableStreamUnderlyingSource = %CreatePrivateOwnSymbol('[[underlyingSource]]');
const readableStreamControllerControlledReadableStream = %CreatePrivateOwnSymbol('[[controlledReadableStream]]');
const readableStreamReaderClosedPromise = %CreatePrivateOwnSymbol('[[closedPromise]]');
const readableStreamReaderClosedPromise_resolve = %CreatePrivateOwnSymbol('[[closedPromise]] resolve');
const readableStreamReaderClosedPromise_reject = %CreatePrivateOwnSymbol('[[closedPromise]] reject');
const readableStreamReaderOwnerReadableStream = %CreatePrivateOwnSymbol('[[ownerReadableStream]]');
const readableStreamReaderReadRequests = %CreatePrivateOwnSymbol('[[readRequests]]');
const readableStreamReaderState = %CreatePrivateOwnSymbol('[[state]]');
const readableStreamReaderStoredError = %CreatePrivateOwnSymbol('[[storedError]]');
const queueSize = %CreatePrivateOwnSymbol('Queue-with-sizes queue size');
const readRequestPromise = %CreatePrivateOwnSymbol('[[promise]]');
const readRequestResolve = %CreatePrivateOwnSymbol('[[promise]] resolve');
const readRequestReject = %CreatePrivateOwnSymbol('[[promise]] reject');
// TODO(domenic): come up with a better way of getting at intrinsics
const Number = global.Number;
const TypeError = global.TypeError;
const RangeError = global.RangeError;
const Promise = global.Promise;
const Number_isNaN = Number.isNaN;
const Promise_resolve = Promise.resolve.bind(Promise);
const Promise_reject = Promise.reject.bind(Promise);
const uncurryThis = Function.prototype.bind.bind(Function.prototype.call);
const applyFunction = uncurryThis(Function.prototype.apply);
const thenPromise = uncurryThis(Promise.prototype.then);
const shiftArray = uncurryThis(Array.prototype.shift);
const pushArray = uncurryThis(Array.prototype.push);
// TODO(domenic): need to censor Function.prototype.toString for these; use V8 API presumably
class ReadableStream {
constructor(underlyingSource, strategy) {
if (underlyingSource === undefined) {
underlyingSource = {};
}
if (strategy === undefined) {
strategy = {};
}
const size = strategy.size;
let highWaterMark = strategy.highWaterMark;
if (highWaterMark === undefined) {
highWaterMark = 1;
}
const normalizedStrategy = ValidateAndNormalizeQueuingStrategy(size, highWaterMark);
this[readableStreamUnderlyingSource] = underlyingSource;
// TODO(domenic) use a real queue data structure
const queue = [];
queue[queueSize] = 0;
this[readableStreamQueue] = queue;
// TODO(domenic) consolidate booleans into a bit field?
// TODO(domenic) use integers for state? (or put in bit field?)
this[readableStreamState] = 'readable';
this[readableStreamStarted] = false;
this[readableStreamCloseRequested] = false;
this[readableStreamPulling] = false;
this[readableStreamPullAgain] = false;
this[readableStreamReader] = undefined;
this[readableStreamStoredError] = undefined;
this[readableStreamStrategySize] = normalizedStrategy.size;
this[readableStreamStrategyHWM] = normalizedStrategy.highWaterMark;
const controller = new ReadableStreamController(this);
this[readableStreamController] = controller;
const that = this;
const startResult = InvokeOrNoop(underlyingSource, 'start', [controller]);
thenPromise(Promise_resolve(startResult),
function() {
that[readableStreamStarted] = true;
RequestReadableStreamPull(that);
},
function(r) {
if (that[readableStreamState] === 'readable') {
return ErrorReadableStream(that, r);
}
}
);
}
cancel(reason) {
if (IsReadableStream(this) === false) {
return Promise_reject(new TypeError(
'ReadableStream.prototype.cancel can only be used on a ReadableStream'));
}
if (IsReadableStreamLocked(this) === true) {
return Promise_reject(new TypeError(
'Cannot cancel a stream that already has a reader'));
}
return CancelReadableStream(this, reason);
}
getReader() {
if (IsReadableStream(this) === false) {
throw new TypeError('ReadableStream.prototype.getReader can only be used on a ReadableStream');
}
return AcquireReadableStreamReader(this);
}
}
class ReadableStreamController {
constructor(stream) {
if (IsReadableStream(stream) === false) {
throw new TypeError('ReadableStreamController can only be constructed with a ReadableStream instance');
}
if (stream[readableStreamController] !== undefined) {
throw new TypeError(
'ReadableStreamController instances can only be created by the ReadableStream constructor');
}
this[readableStreamControllerControlledReadableStream] = stream;
}
get desiredSize() {
if (IsReadableStreamController(this) === false) {
throw new TypeError(
'ReadableStreamController.prototype.desiredSize can only be used on a ReadableStreamController');
}
return GetReadableStreamDesiredSize(this[readableStreamControllerControlledReadableStream]);
}
close() {
if (IsReadableStreamController(this) === false) {
throw new TypeError(
'ReadableStreamController.prototype.close can only be used on a ReadableStreamController');
}
const stream = this[readableStreamControllerControlledReadableStream];
if (stream[readableStreamCloseRequested] === true) {
throw new TypeError('The stream has already been closed; do not close it again!');
}
if (stream[readableStreamState] === 'errored') {
throw new TypeError('The stream is in an errored state and cannot be closed');
}
return CloseReadableStream(stream);
}
enqueue(chunk) {
if (IsReadableStreamController(this) === false) {
throw new TypeError(
'ReadableStreamController.prototype.enqueue can only be used on a ReadableStreamController');
}
const stream = this[readableStreamControllerControlledReadableStream];
if (stream[readableStreamState] === 'errored') {
throw stream[readableStreamStoredError];
}
if (stream[readableStreamCloseRequested] === true) {
throw new TypeError('stream is closed or draining');
}
return EnqueueInReadableStream(stream, chunk);
}
error(e) {
if (IsReadableStreamController(this) === false) {
throw new TypeError(
'ReadableStreamController.prototype.error can only be used on a ReadableStreamController');
}
const stream = this[readableStreamControllerControlledReadableStream];
const state = stream[readableStreamState];
if (state !== 'readable') {
throw new TypeError(`The stream is ${state} and so cannot be errored`);
}
return ErrorReadableStream(stream, e);
}
}
class ReadableStreamReader {
constructor(stream) {
if (IsReadableStream(stream) === false) {
throw new TypeError('ReadableStreamReader can only be constructed with a ReadableStream instance');
}
if (IsReadableStreamLocked(stream) === true) {
throw new TypeError('This stream has already been locked for exclusive reading by another reader');
}
stream[readableStreamReader] = this;
this[readableStreamReaderOwnerReadableStream] = stream;
// TODO(domenic): use integers for state?
this[readableStreamReaderState] = 'readable';
this[readableStreamReaderStoredError] = undefined;
// TODO(domenic): use a real queue data structure
this[readableStreamReaderReadRequests] = [];
// TODO(domenic): use faster means of creating/resolving/rejecting promises
const that = this;
this[readableStreamReaderClosedPromise] = new Promise(function(resolve, reject) {
that[readableStreamReaderClosedPromise_resolve] = resolve;
that[readableStreamReaderClosedPromise_reject] = reject;
});
const streamState = stream[readableStreamState];
if (streamState === 'closed' || streamState === 'errored') {
ReleaseReadableStreamReader(this);
}
}
get closed() {
if (IsReadableStreamReader(this) === false) {
return Promise_reject(
new TypeError('ReadableStreamReader.prototype.closed can only be used on a ReadableStreamReader'));
}
return this[readableStreamReaderClosedPromise];
}
cancel(reason) {
if (IsReadableStreamReader(this) === false) {
return Promise_reject(
new TypeError('ReadableStreamReader.prototype.cancel can only be used on a ReadableStreamReader'));
}
const state = this[readableStreamReaderState];
if (state === 'closed') {
return Promise_resolve(undefined);
}
if (state === 'errored') {
return Promise_reject(this[readableStreamReaderStoredError]);
}
return CancelReadableStream(this[readableStreamReaderOwnerReadableStream], reason);
}
read() {
if (IsReadableStreamReader(this) === false) {
return Promise_reject(
new TypeError('ReadableStreamReader.prototype.read can only be used on a ReadableStreamReader'));
}
return ReadFromReadableStreamReader(this);
}
releaseLock() {
if (IsReadableStreamReader(this) === false) {
throw new TypeError(
'ReadableStreamReader.prototype.releaseLock can only be used on a ReadableStreamReader');
}
if (this[readableStreamReaderOwnerReadableStream] === undefined) {
return undefined;
}
// TODO(domenic): is getting array lengths safe? I don't think so.
// Might become moot if we have a better data structure.
if (this[readableStreamReaderReadRequests].length > 0) {
throw new TypeError(
'Tried to release a reader lock when that reader has pending read() calls un-settled');
}
return ReleaseReadableStreamReader(this);
}
}
//
// Readable stream abstract operations
//
function AcquireReadableStreamReader(stream) {
return new ReadableStreamReader(stream);
}
function CancelReadableStream(stream, reason) {
const state = stream[readableStreamState];
if (state === 'closed') {
return Promise_resolve(undefined);
}
if (state === 'errored') {
return Promise_reject(stream[readableStreamStoredError]);
}
stream[readableStreamQueue] = [];
FinishClosingReadableStream(stream);
const underlyingSource = stream[readableStreamUnderlyingSource];
const sourceCancelPromise = PromiseInvokeOrNoop(underlyingSource, 'cancel', [reason]);
return thenPromise(sourceCancelPromise, function() { return undefined; });
}
function CloseReadableStream(stream) {
if (stream[readableStreamState] === 'closed') {
return undefined;
}
stream[readableStreamCloseRequested] = true;
if (stream[readableStreamQueue].length === 0) {
return FinishClosingReadableStream(stream);
}
}
function EnqueueInReadableStream(stream, chunk) {
if (stream[readableStreamState] === 'closed') {
return undefined;
}
if (IsReadableStreamLocked(stream) === true &&
stream[readableStreamReader][readableStreamReaderReadRequests].length > 0) {
const readRequest = shiftArray(stream[readableStreamReader][readableStreamReaderReadRequests]);
readRequest[readRequestResolve](CreateIterResultObject(chunk, false));
} else {
let chunkSize = 1;
const strategySize = stream[readableStreamStrategySize];
if (strategySize !== undefined) {
try {
chunkSize = strategySize(chunk);
} catch (chunkSizeE) {
ErrorReadableStream(stream, chunkSizeE);
throw chunkSizeE;
}
}
try {
EnqueueValueWithSize(stream[readableStreamQueue], chunk, chunkSize);
} catch (enqueueE) {
ErrorReadableStream(stream, enqueueE);
throw enqueueE;
}
}
RequestReadableStreamPull(stream);
}
function ErrorReadableStream(stream, e) {
stream[readableStreamQueue] = [];
stream[readableStreamStoredError] = e;
stream[readableStreamState] = 'errored';
if (IsReadableStreamLocked(stream) === true) {
return ReleaseReadableStreamReader(stream[readableStreamReader]);
}
}
function FinishClosingReadableStream(stream) {
stream[readableStreamState] = 'closed';
if (IsReadableStreamLocked(stream) === true) {
return ReleaseReadableStreamReader(stream[readableStreamReader]);
}
}
function GetReadableStreamDesiredSize(stream) {
const queueSize = GetTotalQueueSize(stream[readableStreamQueue]);
return stream[readableStreamStrategyHWM] - queueSize;
}
function IsReadableStream(x) {
// TODO(domenic): is it safe to allow this to be called on non-objects?
return %HasOwnProperty(x, readableStreamUnderlyingSource);
}
function IsReadableStreamLocked(stream) {
return stream[readableStreamReader] !== undefined;
}
function IsReadableStreamController(x) {
return %HasOwnProperty(x, readableStreamControllerControlledReadableStream);
}
function IsReadableStreamReader(x) {
return %HasOwnProperty(x, readableStreamReaderOwnerReadableStream);
}
function ReadFromReadableStreamReader(reader) {
const state = reader[readableStreamReaderState];
if (state === 'closed') {
return Promise_resolve(CreateIterResultObject(undefined, true));
}
if (state === 'errored') {
return Promise_reject(reader[readableStreamReaderStoredError]);
}
const ownerReadableStream = reader[readableStreamReaderOwnerReadableStream];
const queue = ownerReadableStream[readableStreamQueue];
if (queue.length > 0) {
const chunk = DequeueValue(queue);
if (ownerReadableStream[readableStreamCloseRequested] === true && queue.length === 0) {
FinishClosingReadableStream(ownerReadableStream);
} else {
RequestReadableStreamPull(ownerReadableStream);
}
return Promise_resolve(CreateIterResultObject(chunk, false));
} else {
const readRequest = {};
readRequest[readRequestPromise] = new Promise(function(resolve, reject) {
readRequest[readRequestResolve] = resolve;
readRequest[readRequestReject] = reject;
});
pushArray(reader[readableStreamReaderReadRequests], readRequest);
RequestReadableStreamPull(ownerReadableStream);
return readRequest[readRequestPromise];
}
}
function ReleaseReadableStreamReader(reader) {
const ownerReadableStream = reader[readableStreamReaderOwnerReadableStream];
if (ownerReadableStream[readableStreamState] === 'errored') {
reader[readableStreamReaderState] = 'errored';
const e = ownerReadableStream[readableStreamStoredError];
reader[readableStreamReaderStoredError] = e;
reader[readableStreamReaderClosedPromise_reject](e);
for (const readRequest of reader[readableStreamReaderReadRequests]) {
readRequest[readRequestReject](e);
}
} else {
reader[readableStreamReaderState] = 'closed';
reader[readableStreamReaderClosedPromise_resolve](undefined);
for (const readRequest of reader[readableStreamReaderReadRequests]) {
readRequest[readRequestResolve](CreateIterResultObject(undefined, true));
}
}
reader[readableStreamReaderReadRequests] = [];
ownerReadableStream[readableStreamReader] = undefined;
reader[readableStreamReaderOwnerReadableStream] = undefined;
}
function RequestReadableStreamPull(stream) {
const shouldPull = ShouldReadableStreamPull(stream);
if (shouldPull === false) {
return undefined;
}
if (stream[readableStreamPulling] === true) {
stream[readableStreamPullAgain] = true;
return undefined;
}
stream[readableStreamPulling] = true;
const underlyingSource = stream[readableStreamUnderlyingSource];
const controller = stream[readableStreamController];
const pullPromise = PromiseInvokeOrNoop(underlyingSource, 'pull', [controller]);
thenPromise(pullPromise,
function() {
stream[readableStreamPulling] = false;
if (stream[readableStreamPullAgain] === true) {
stream[readableStreamPullAgain] = false;
return RequestReadableStreamPull(stream);
}
},
function(e) {
if (stream[readableStreamState] === 'readable') {
return ErrorReadableStream(stream, e);
}
}
);
}
function ShouldReadableStreamPull(stream) {
const state = stream[readableStreamState];
if (state === 'closed' || state === 'errored') {
return false;
}
if (stream[readableStreamCloseRequested] === true) {
return false;
}
if (stream[readableStreamStarted] === false) {
return false;
}
if (IsReadableStreamLocked(stream) === true) {
const reader = stream[readableStreamReader];
const readRequests = reader[readableStreamReaderReadRequests];
if (readRequests.length > 0) {
return true;
}
}
const desiredSize = GetReadableStreamDesiredSize(stream);
if (desiredSize > 0) {
return true;
}
return false;
}
// TODO TeeReadableStream
//
// Queue-with-sizes
//
// TODO(domenic): manipulating arrays seems fraught with peril in general; e.g. if someone defines getters/setters
// on the prototype chain, we can no longer shift and push.
function DequeueValue(queue) {
return shiftArray(queue).value;
}
function EnqueueValueWithSize(queue, value, size) {
size = Number(size);
if (Number_isNaN(size) || size === +Infinity || size === -Infinity) {
throw new RangeError('size must be a finite, non-NaN number.');
}
// TODO(domenic): is adding numbers safe? Overridden valueOf could ruin our day.
queue[queueSize] += size;
pushArray(queue, { value, size });
}
function GetTotalQueueSize(queue) {
return queue[queueSize];
}
//
// Other helpers
//
function ValidateAndNormalizeQueuingStrategy(size, highWaterMark) {
if (size !== undefined && typeof size !== 'function') {
throw new TypeError('size property of a queuing strategy must be a function');
}
highWaterMark = Number(highWaterMark);
if (Number_isNaN(highWaterMark)) {
throw new TypeError('highWaterMark property of a queuing strategy must be convertible to a non-NaN number');
}
if (highWaterMark < 0) {
throw new RangeError('highWaterMark property of a queuing strategy must be nonnegative');
}
return { size, highWaterMark };
}
function InvokeOrNoop(O, P, args) {
const method = O[P];
if (method === undefined) {
return undefined;
}
return applyFunction(method, O, args);
}
function PromiseInvokeOrNoop(O, P, args) {
let method;
try {
method = O[P];
} catch (methodE) {
return Promise_reject(methodE);
}
if (method === undefined) {
return Promise_resolve(undefined);
}
try {
return Promise_resolve(applyFunction(method, O, args));
} catch (e) {
return Promise_reject(e);
}
}
function CreateIterResultObject(value, done) {
return { value, done };
}
//
// Exports
//
Object.defineProperty(global, 'ReadableStream', {
enumerable: false,
writable: true,
configurable: true,
value: ReadableStream
});
})();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment