Skip to content

Instantly share code, notes, and snippets.

@FormerlyZeroCool
Last active January 5, 2023 12:21
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save FormerlyZeroCool/42afcc4b21c4b2f947799db85c02f206 to your computer and use it in GitHub Desktop.
Save FormerlyZeroCool/42afcc4b21c4b2f947799db85c02f206 to your computer and use it in GitHub Desktop.
const jobs = 1200;
const example_input = 27;
const threads = 16;
const example = async () => {
const pool = new ProcessPool<number, number>(Math.floor(threads), (input:number) => {
let fib = (x:number) => x <=1 ? x : fib(x-1) + fib(x-2);
return fib(input);
}, [], []);
const input:number[] = [];
input.length = jobs;
input.fill(example_input, 0, jobs);
{
const start_time = Date.now();
//console.log(await pool.batch_call_parallel(input));
console.log("time(s):", (Date.now() - start_time) / 1000);
}
const pool_unordered = new ProcessPoolUnordered<number, number>(Math.floor(threads), (input:number) => {
let fib = (x:number) => x <=1 ? x : fib(x-1) + fib(x-2);
return fib(input);
}, [], []);
for(let i = 0; i < jobs; i++)
pool_unordered.add_job(example_input);
let sum = 0;
let count = 0;
const start_time = Date.now();
await pool_unordered.process_jobs((result) => {
count++;
sum += result;
const completion = Math.round(count / (jobs) * 10000) / 100;
if(count === jobs)
console.log("time(s):", (Date.now() - start_time) / 1000, "Percent complete: ", completion);
});
}
const fib = (x:number) => x <=1 ? x : fib(x-1) + fib(x-2);
const start_time = Date.now();
//for(let i = 0; i < jobs; i++)
//{
//fib(example_input);
//}
fib(example_input)
console.log((Date.now() - start_time) / 1000);
example()
export interface Process_Module {
path:string;
fields:string[];
};
export interface Process_Message <INPUT_TYPE> {
process_id:number;
data:INPUT_TYPE;
}
export class ProcessPool <INPUT_TYPE, RESULT_TYPE> {
workers:Worker[];
worker_promises:Promise<RESULT_TYPE>[];
worker_free_list:number[];
code_url:string;
modules:Process_Module[];
processes_enqueued_or_running:number;
last_enqueued_id:number;
constructor(poolSize:number, main:(input:INPUT_TYPE) => RESULT_TYPE, library_code:Function[] = [], modules:Process_Module[] = [])
{
this.workers = [];
this.worker_promises = [];
this.worker_free_list = [];
this.processes_enqueued_or_running = 0;
this.last_enqueued_id = 0;
this.modules = modules;
const stringified_code = modules.map((pm) => {
return `import {${pm.fields.join(',')}} from '${pm.path}'\n`
}).concat(
library_code.map(foo => `const ${foo.name} = ${foo.toString()}`).concat(
["\nconst main = ", main.toString(), ";\n", `self.onmessage = (event) => {const data = main(event.data.data); postMessage({process_id:event.data.process_id, data:data}); }`]));
console.log(stringified_code.join(''));
this.code_url = window.URL.createObjectURL(new Blob(stringified_code, {
type: "text/javascript"
}));
for(let i = 0; i < poolSize; i++)
{
this.worker_free_list.push(i);
this.workers.push(this.createWorker());
}
this.worker_promises.length = poolSize;
}
createWorker():Worker
{
const worker = new Worker(this.code_url, { type:'module' });
return worker;
}
async call_parallel(data:INPUT_TYPE):Promise<RESULT_TYPE>
{
this.processes_enqueued_or_running++;
return await this._call_parallel(data);
}
async _call_parallel(data:INPUT_TYPE):Promise<RESULT_TYPE>
{
let process_id = this.worker_free_list.pop();
while(process_id === undefined)
{
process_id = this.last_enqueued_id++;
this.last_enqueued_id %= this.workers.length;
await this.worker_promises[process_id];
const index_of_process_in_free_list = this.worker_free_list.indexOf(process_id);
if(index_of_process_in_free_list === -1)
{
if(this.worker_free_list.length)
return this.call_parallel(data);
process_id = undefined;
}
else
this.worker_free_list.splice(index_of_process_in_free_list, 1);
}
const executor = (resolve:(value:RESULT_TYPE) => void) => {
worker.onmessage = (event:MessageEvent<Process_Message<RESULT_TYPE>>) => {
this.worker_free_list.push(event.data.process_id);
this.processes_enqueued_or_running--;
//console.log("thread id:", event.data.process_id);
resolve(event.data.data);
}
};
const worker = this.workers[process_id];
//return promise that will resolve to worker result
const promise = new Promise<RESULT_TYPE>(executor);
this.worker_promises[process_id] = promise;
worker.postMessage({
data:data,
process_id:process_id
});
return promise;
}
async batch_call_parallel(data:INPUT_TYPE[]):Promise<RESULT_TYPE[]>
{
const input_queue = new Queue<Promise<RESULT_TYPE>>();
for(let i = 0; i < data.length; i++)
{
const rec = data[i];
input_queue.push(this.call_parallel(rec));
}
const promise = new Promise<RESULT_TYPE[]>(async (resolve:(value:RESULT_TYPE[]) => void) => {
const final_result:RESULT_TYPE[] = [];
while(input_queue.length)
{
const result = await input_queue.pop()!;
final_result.push(result);
}
resolve(final_result);
});
return promise;
}
};
export class ProcessPoolUnordered <INPUT_TYPE, RESULT_TYPE> {
pool:ProcessPool<INPUT_TYPE, RESULT_TYPE>;
input_queue:Queue<INPUT_TYPE>;
constructor(poolSize:number, main:(input:INPUT_TYPE) => RESULT_TYPE, library_code:Function[] = [], modules:Process_Module[] = [])
{
this.pool = new ProcessPool<INPUT_TYPE, RESULT_TYPE>(poolSize, main, library_code, modules);
this.input_queue = new Queue<INPUT_TYPE>();
}
processing():boolean
{
return this.input_queue.length > 0;
}
add_job(data:INPUT_TYPE):void
{
this.input_queue.push(data);
}
async process_jobs(apply:(result:RESULT_TYPE) => void):Promise<void>
{
while(this.input_queue.length)
{
let last_thread_id =
this.pool.workers.findIndex((worker:Worker, index:number) => {
return this.pool.worker_free_list.indexOf(index) === -1;
});
while(this.pool.processes_enqueued_or_running < this.pool.workers.length * 3 && this.input_queue.length)
{
this.pool.call_parallel(this.input_queue.pop()!).then((result:RESULT_TYPE) => {
apply(result);
})
}
if(last_thread_id != -1)
await this.pool.worker_promises[last_thread_id];
}
}
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment