Skip to content

Instantly share code, notes, and snippets.

@tomhicks
Last active July 5, 2022 08:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tomhicks/2e006ff9e5de165bb8d35accd4975cdd to your computer and use it in GitHub Desktop.
Save tomhicks/2e006ff9e5de165bb8d35accd4975cdd to your computer and use it in GitHub Desktop.
Lets you continually push to a "stream" of promises, to be notified of their resolution/rejection, with the guarantee that the resolved values will be emitted in order, skipping any that come back out of order.
type PromiseSequencer<T> = {
/**
* Add a promise to the end of the queue. When this promise resolves, you
* may be notified via onLatest, assuming that no later promise has resolved
* in the meantime.
*/
push(promise: Promise<T>): void;
/**
* Remove all promises from the queue. Any promises in the queue can still
* resolve or reject, but you will not be notified via the onLatest or onError
* callbacks.
*/
clear(): void;
};
type SequencerParams<T> = {
/**
* Called when a result comes back. Guaranteed to always be called in the same
* order as the promises were pushed, but you may not get a call for every
* promise.
*/
onLatest(result: T): void;
/**
* Called when the last promise in the queue rejects. This means we won't ever
* reach our final desired state.
*/
onErrorWithNoRemainingPromises?(err: unknown): void;
/**
* Called when a non-terminal promise rejects. A later promise in the queue could
* still give us a more up-to-date value, so this is less "serious" than the case
* when the final promise rejects.
*/
onErrorWithRemainingPromises?(err: unknown): void;
};
export function makePromiseSequencer<T>({
onLatest,
onErrorWithRemainingPromises,
onErrorWithNoRemainingPromises,
}: SequencerParams<T>): PromiseSequencer<T> {
let promises: Array<Promise<T>> = [];
return {
clear() {
promises = [];
},
push(promise) {
promises.push(promise);
promise.then(
function (r) {
const indexOfThisPromise = promises.indexOf(promise);
if (indexOfThisPromise > -1) {
// remove all older promises as this result supersedes any
// older ones that might resolve or reject later.
promises = promises.slice(indexOfThisPromise + 1);
onLatest(r);
}
},
function (e: unknown) {
const indexOfThisPromise = promises.indexOf(promise);
if (indexOfThisPromise > -1) {
// there's a semantic difference between the last promise in the queue
// rejecting, or any other one rejecting. If the last one errors, then
// we know we never reached our desired "final" state, which is a different
// class of problem than if an older item fails.
if (indexOfThisPromise === promises.length - 1) {
onErrorWithNoRemainingPromises?.(e);
} else {
onErrorWithRemainingPromises?.(e);
}
// remove this promise from the queue, but leave olders ones
// as they are still relevant, because this one errored.
promises.splice(indexOfThisPromise, 1);
}
}
);
},
};
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment