Skip to content

Instantly share code, notes, and snippets.

@Hashbrown777
Last active February 28, 2022 00:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Hashbrown777/4a85109c91ee577939e9447414fba9b2 to your computer and use it in GitHub Desktop.
Save Hashbrown777/4a85109c91ee577939e9447414fba9b2 to your computer and use it in GitHub Desktop.
type DropFirst<T extends unknown[]> = T extends [any, ...infer U] ? U : never;
type ResolveQueue <T> = (output :T[]) => void;
type RejectQueue = (error :any) => void;
type InitQueueParams<T> = [ResolveQueue<T>, RejectQueue];
type InitQueue <T> = (...args :InitQueueParams<T>) => any;
type Enqueued <T> = () => Promise<T>;
export class Queue<T> extends Promise<T[]> {
private max :number;
private waiting :Array<Enqueued<T>>;
private current :number;
private index :number;
private output :T[]|null;
private starting :boolean;
private resolve !:null|ResolveQueue<T>;
private reject !:null|RejectQueue;
protected init :null|InitQueue<T> = (resolve, reject) => {
this.resolve = resolve;
this.reject = reject;
this.init = null;
return this;
};
//bug in TS static function cant resolve type of protected function
/*protected*/ constructor(
callback :InitQueue<T>,
maxConcurrent :number
) {
super(callback);
this.max = maxConcurrent
this.waiting = [];
this.current = 0;
this.index = -1;
this.output = null;
this.starting = false;
}
static async create<T>(...args :DropFirst<ConstructorParameters<typeof Queue>>) :Promise<[Queue<T>]> {
let output :Queue<T>;
const initArgs = await new Promise<InitQueueParams<T>>((resolve) => {
output = new Queue<T>(
(...initArgs) => { resolve(initArgs); },
...args
);
});
//return an array because JS is a dick and unrolls all nested promises
//INCLUDING the one we're deliberately trying to return AS A PROMISE (Queue extends Promise)
return [output!.init!(...initArgs)];
}
private async pop(index :number, next :Enqueued<T>) {
try {
this.output![index] = await next();
--this.current;
this.start();
}
catch (error) {
if (this.reject) {
this.resolve = null;
this.reject(error);
}
else {
--this.current;
this.start();
}
}
}
start() :this {
if (this.starting)
return this;
this.starting = true;
if (this.output == null)
this.output = new Array(this.waiting.length);
while (this.current < this.max && this.waiting.length) {
++this.current;
this.pop(++this.index, this.waiting.shift()!);
}
if (
(this.push == null && this.resolve) &&
!(this.waiting.length || this.current)
) {
this.resolve(this.output);
this.output = null;
}
else
this.starting = false;
return this;
}
push :null|((next :Enqueued<T>) => number) = (next :Enqueued<T>) => {
const index = this.index + this.waiting.push(next);
if (this.output != null) {
this.output.push(undefined as any);
this.start();
}
return index;
}
end() :this {
this.push = null;
this.start();
return this;
}
}
@Hashbrown777
Copy link
Author

Hashbrown777 commented May 18, 2021

//must create Queue using `create`
//Queue of 100 concurrent processes
//`push`ed items wont execute until `start` is called, start them immediately as they're added
const queue :Queue<any> = (await Queue.create<any>(100))[0].start();

for (/*some loop*/) {
    queue.push!(() => {
        pipeline(
            spawn(
                'pdftocairo',
                ['-svg', '-f', page, '-l', page, file, '-']
            ).stdout,
            createGzip({level:9}),
            fs_writeStream(`${dir}${page}.svg.gz`)
        )
    })
}

//Queue wont signal resolved until `end` of inputs is notified
await queue.end();

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment