Skip to content

Instantly share code, notes, and snippets.

@sayhicoelho
Last active February 27, 2024 00:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sayhicoelho/2e9114c13553fb63014b6031601f4af7 to your computer and use it in GitHub Desktop.
Save sayhicoelho/2e9114c13553fb63014b6031601f4af7 to your computer and use it in GitHub Desktop.
NodeJS: In Memory Worker + Worker Threads
const worker = require('./worker')
const data = {
// ...
}
worker.enqueue('send-reports', data)
exports.run = async data => {
// ...
}
const { Worker, isMainThread, parentPort } = require('worker_threads')
const { EventEmitter } = require('events')
const database = require('./database')
const jobs = {
'send-reports': require('./jobs/send-reports-job')
}
let worker = null
exports.enqueue = (jobName, data) => {
if (worker) {
worker.postMessage({
type: 'enqueue',
data: {
jobName,
data
}
})
}
}
async function main() {
if (isMainThread) {
worker = new Worker(__filename)
} else {
console.log('Worker started')
await database.connect()
const event = new EventEmitter()
const queue = []
const maxRetries = 3
let running = false
let currentJobId = 0
function enqueue(jobName, data) {
queue.push([++currentJobId, jobName, data])
if (!running) {
event.emit('next')
}
}
parentPort.on('message', e => {
switch (e.type) {
case 'enqueue':
enqueue(e.data.jobName, e.data.data)
break
}
})
event.on('run', async (jobId, jobName, data, retries) => {
try {
console.log(`Running job ${jobId}...`)
await jobs[jobName].run(data)
console.log(`Job ${jobId} done.`)
await new Promise(resolve => setTimeout(resolve, 1000))
event.emit('next')
} catch (err) {
console.error(`[${jobId}]: Job "${jobName}" failed to execute with reason: ${err?.message}`)
if (retries < maxRetries) {
event.emit('run', jobId, jobName, data, retries + 1)
} else {
event.emit('next')
}
}
})
event.on('next', () => {
running = true
const nextJob = queue.shift()
if (nextJob) {
const [jobId, jobName, data] = nextJob
event.emit('run', jobId, jobName, data, 1)
} else {
running = false
}
})
}
}
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment