Skip to content

Instantly share code, notes, and snippets.

@umihico
Created November 18, 2023 13:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save umihico/97f5b0f78c389f254b6dda2641c488a8 to your computer and use it in GitHub Desktop.
Save umihico/97f5b0f78c389f254b6dda2641c488a8 to your computer and use it in GitHub Desktop.
Node.js TypeScript multi process (cluster.fork) example
import cluster from "cluster"
import { cpus } from "os"
export const multiProcessHandler = async (
jobConsumer: (job: string) => Promise<void>,
JobQueueGenerator: () => Generator<string, void, unknown>,
) => {
if (cluster.isPrimary) {
console.log(`Primary ${process.pid} is running`)
const numCPUs = cpus().length
const jobQueue = await JobQueueGenerator()
cluster.on("message", (worker, message) => {
if (message !== "ready") throw new Error(`unexpected message ${message}`)
const { value: newJob, done } = jobQueue.next()
if (done) {
worker.send("exitNow")
} else {
worker.send(newJob)
}
})
for (let i = 0; i < numCPUs; i++) {
console.log(`Forking process number ${i}...`)
cluster.fork({
WORKER_INDEX: i.toString(),
WORKER_COUNT: numCPUs.toString(),
})
}
await Promise.all(
Object.values(cluster.workers!).map((worker) => {
return new Promise((resolve) => {
if (!worker) throw new Error("unexpectedly worker is null")
worker.on("exit", (code, signal) => {
console.log(`worker ${worker.process.pid} died`)
resolve(null)
})
})
}),
)
cluster.removeAllListeners()
} else {
console.log(
`WORKER_INDEX ${process.env.WORKER_INDEX} (pid:${process.pid}) started`,
)
process.on("message", async (message) => {
if (typeof message !== "string")
throw new Error(`unexpected message ${message}`)
if (message === "exitNow") {
cluster.worker?.kill()
} else {
const job = JSON.parse(message)
await jobConsumer(job)
process.send?.("ready")
}
})
process.send?.("ready")
}
}
if (require.main === module) {
const jobQueueGenerator = function* () {
const list = Array.from({ length: 100 }, (_, i) => i.toString())
for (const item of list) {
yield JSON.stringify({ item })
}
}
const jobConsumer = async (job: string) => {
console.log(
`WORKER_INDEX ${process.env.WORKER_INDEX} (pid:${
process.pid
}) job:${JSON.stringify(job)}`,
)
await new Promise((resolve) => setTimeout(resolve, 1000))
}
;(async () => {
await multiProcessHandler(jobConsumer, jobQueueGenerator)
})()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment