Last active
January 5, 2023 12:21
-
-
Save FormerlyZeroCool/42afcc4b21c4b2f947799db85c02f206 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const jobs = 1200; | |
const example_input = 27; | |
const threads = 16; | |
const example = async () => { | |
const pool = new ProcessPool<number, number>(Math.floor(threads), (input:number) => { | |
let fib = (x:number) => x <=1 ? x : fib(x-1) + fib(x-2); | |
return fib(input); | |
}, [], []); | |
const input:number[] = []; | |
input.length = jobs; | |
input.fill(example_input, 0, jobs); | |
{ | |
const start_time = Date.now(); | |
//console.log(await pool.batch_call_parallel(input)); | |
console.log("time(s):", (Date.now() - start_time) / 1000); | |
} | |
const pool_unordered = new ProcessPoolUnordered<number, number>(Math.floor(threads), (input:number) => { | |
let fib = (x:number) => x <=1 ? x : fib(x-1) + fib(x-2); | |
return fib(input); | |
}, [], []); | |
for(let i = 0; i < jobs; i++) | |
pool_unordered.add_job(example_input); | |
let sum = 0; | |
let count = 0; | |
const start_time = Date.now(); | |
await pool_unordered.process_jobs((result) => { | |
count++; | |
sum += result; | |
const completion = Math.round(count / (jobs) * 10000) / 100; | |
if(count === jobs) | |
console.log("time(s):", (Date.now() - start_time) / 1000, "Percent complete: ", completion); | |
}); | |
} | |
const fib = (x:number) => x <=1 ? x : fib(x-1) + fib(x-2); | |
const start_time = Date.now(); | |
//for(let i = 0; i < jobs; i++) | |
//{ | |
//fib(example_input); | |
//} | |
fib(example_input) | |
console.log((Date.now() - start_time) / 1000); | |
example() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
export interface Process_Module { | |
path:string; | |
fields:string[]; | |
}; | |
export interface Process_Message <INPUT_TYPE> { | |
process_id:number; | |
data:INPUT_TYPE; | |
} | |
export class ProcessPool <INPUT_TYPE, RESULT_TYPE> { | |
workers:Worker[]; | |
worker_promises:Promise<RESULT_TYPE>[]; | |
worker_free_list:number[]; | |
code_url:string; | |
modules:Process_Module[]; | |
processes_enqueued_or_running:number; | |
last_enqueued_id:number; | |
constructor(poolSize:number, main:(input:INPUT_TYPE) => RESULT_TYPE, library_code:Function[] = [], modules:Process_Module[] = []) | |
{ | |
this.workers = []; | |
this.worker_promises = []; | |
this.worker_free_list = []; | |
this.processes_enqueued_or_running = 0; | |
this.last_enqueued_id = 0; | |
this.modules = modules; | |
const stringified_code = modules.map((pm) => { | |
return `import {${pm.fields.join(',')}} from '${pm.path}'\n` | |
}).concat( | |
library_code.map(foo => `const ${foo.name} = ${foo.toString()}`).concat( | |
["\nconst main = ", main.toString(), ";\n", `self.onmessage = (event) => {const data = main(event.data.data); postMessage({process_id:event.data.process_id, data:data}); }`])); | |
console.log(stringified_code.join('')); | |
this.code_url = window.URL.createObjectURL(new Blob(stringified_code, { | |
type: "text/javascript" | |
})); | |
for(let i = 0; i < poolSize; i++) | |
{ | |
this.worker_free_list.push(i); | |
this.workers.push(this.createWorker()); | |
} | |
this.worker_promises.length = poolSize; | |
} | |
createWorker():Worker | |
{ | |
const worker = new Worker(this.code_url, { type:'module' }); | |
return worker; | |
} | |
async call_parallel(data:INPUT_TYPE):Promise<RESULT_TYPE> | |
{ | |
this.processes_enqueued_or_running++; | |
return await this._call_parallel(data); | |
} | |
async _call_parallel(data:INPUT_TYPE):Promise<RESULT_TYPE> | |
{ | |
let process_id = this.worker_free_list.pop(); | |
while(process_id === undefined) | |
{ | |
process_id = this.last_enqueued_id++; | |
this.last_enqueued_id %= this.workers.length; | |
await this.worker_promises[process_id]; | |
const index_of_process_in_free_list = this.worker_free_list.indexOf(process_id); | |
if(index_of_process_in_free_list === -1) | |
{ | |
if(this.worker_free_list.length) | |
return this.call_parallel(data); | |
process_id = undefined; | |
} | |
else | |
this.worker_free_list.splice(index_of_process_in_free_list, 1); | |
} | |
const executor = (resolve:(value:RESULT_TYPE) => void) => { | |
worker.onmessage = (event:MessageEvent<Process_Message<RESULT_TYPE>>) => { | |
this.worker_free_list.push(event.data.process_id); | |
this.processes_enqueued_or_running--; | |
//console.log("thread id:", event.data.process_id); | |
resolve(event.data.data); | |
} | |
}; | |
const worker = this.workers[process_id]; | |
//return promise that will resolve to worker result | |
const promise = new Promise<RESULT_TYPE>(executor); | |
this.worker_promises[process_id] = promise; | |
worker.postMessage({ | |
data:data, | |
process_id:process_id | |
}); | |
return promise; | |
} | |
async batch_call_parallel(data:INPUT_TYPE[]):Promise<RESULT_TYPE[]> | |
{ | |
const input_queue = new Queue<Promise<RESULT_TYPE>>(); | |
for(let i = 0; i < data.length; i++) | |
{ | |
const rec = data[i]; | |
input_queue.push(this.call_parallel(rec)); | |
} | |
const promise = new Promise<RESULT_TYPE[]>(async (resolve:(value:RESULT_TYPE[]) => void) => { | |
const final_result:RESULT_TYPE[] = []; | |
while(input_queue.length) | |
{ | |
const result = await input_queue.pop()!; | |
final_result.push(result); | |
} | |
resolve(final_result); | |
}); | |
return promise; | |
} | |
}; | |
export class ProcessPoolUnordered <INPUT_TYPE, RESULT_TYPE> { | |
pool:ProcessPool<INPUT_TYPE, RESULT_TYPE>; | |
input_queue:Queue<INPUT_TYPE>; | |
constructor(poolSize:number, main:(input:INPUT_TYPE) => RESULT_TYPE, library_code:Function[] = [], modules:Process_Module[] = []) | |
{ | |
this.pool = new ProcessPool<INPUT_TYPE, RESULT_TYPE>(poolSize, main, library_code, modules); | |
this.input_queue = new Queue<INPUT_TYPE>(); | |
} | |
processing():boolean | |
{ | |
return this.input_queue.length > 0; | |
} | |
add_job(data:INPUT_TYPE):void | |
{ | |
this.input_queue.push(data); | |
} | |
async process_jobs(apply:(result:RESULT_TYPE) => void):Promise<void> | |
{ | |
while(this.input_queue.length) | |
{ | |
let last_thread_id = | |
this.pool.workers.findIndex((worker:Worker, index:number) => { | |
return this.pool.worker_free_list.indexOf(index) === -1; | |
}); | |
while(this.pool.processes_enqueued_or_running < this.pool.workers.length * 3 && this.input_queue.length) | |
{ | |
this.pool.call_parallel(this.input_queue.pop()!).then((result:RESULT_TYPE) => { | |
apply(result); | |
}) | |
} | |
if(last_thread_id != -1) | |
await this.pool.worker_promises[last_thread_id]; | |
} | |
} | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment