Last active
January 9, 2020 10:00
-
-
Save guilhermebkel/44ba77ec5a0f17456093f7da79e01346 to your computer and use it in GitHub Desktop.
async-queue
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
Service to make async process of any payload array, with concurrency support. | |
Author: Guilherme Mota Bromonschenkel Lima | |
Github: guilhermebkel | |
*/ | |
// Milliseconds between checks to look for some available spot on the queue. | |
const REFRESH_TIME = 100; | |
let worker = () => {}; | |
let options = { | |
concurrency: 1, | |
retries: 0, | |
delay: 0, | |
}; | |
let payload = []; | |
let enqueued = 0; | |
const delay = async (milliseconds) => { | |
return await new Promise((callback) => { | |
setTimeout(() => { | |
callback(); | |
}, milliseconds); | |
}); | |
}; | |
const set = (_worker, _options = {}) => { | |
if (typeof worker !== "function") { | |
throw new Error("Worker must be a function"); | |
} else if (typeof options !== "object") { | |
throw new Error("Options must be an object"); | |
} | |
worker = _worker | |
options = { | |
...options, | |
..._options, | |
} | |
}; | |
const add = (_payload) => { | |
if (typeof payload === "undefined" || payload === null || payload === undefined) { | |
throw new Error("Payload can not be undefined or null"); | |
} | |
// Recognizes if the given payload is an array in order to | |
// decide how the payload variable will be like. | |
if (Array.isArray(_payload)) { | |
payload = _payload; | |
} else { | |
payload.push(_payload); | |
} | |
}; | |
const initWorker = async () => { | |
// We take the first element of payload array and start processing it. | |
// Tip: We're doing it by [0] because of the lack of performance of .shift() | |
const payloadOnProcess = payload[0]; | |
// Removes the payload that is about to be processed from the list | |
payload = payload.filter(payload => payload.metadata.jobId !== payloadOnProcess.metadata.jobId); | |
enqueued++; | |
try { | |
// In case the maximum retries have not been reached, we start the worker | |
if (payloadOnProcess.metadata.retries <= options.retries) { | |
await worker(payloadOnProcess); | |
} | |
} catch(error) { | |
console.warn(error); | |
// In case of error, if there's a retry option enabled we add the payload back | |
// to queue to start processing it again. | |
if (options.retries && payloadOnProcess.metadata.retries < options.retries) { | |
payloadOnProcess.metadata.retries++; | |
payloadOnProcess.metadata.error = error; | |
add(payloadOnProcess); | |
await delay(options.delay); | |
} | |
} | |
enqueued--; | |
}; | |
const setMetadata = () => { | |
for (let i=0; i<payload.length; i++) { | |
payload[i].metadata = { | |
jobId: i, | |
error: null, | |
retries: 0, | |
}; | |
} | |
}; | |
const process = async () => { | |
if (!payload.length) { | |
throw new Error("Payload not supplied") | |
} | |
setMetadata(); | |
await new Promise(callback => { | |
const checkQueue = setInterval(() => { | |
// If the amount of enqueued jobs is less than the limit of queue concurrency | |
// and the payload has more than one item, we start processing it. | |
if (enqueued < options.concurrency && payload.length > 0) { | |
initWorker(); | |
} else if (enqueued === 0) { | |
clearInterval(checkQueue); | |
callback(); | |
} | |
}, REFRESH_TIME); | |
}) | |
}; | |
const info = () => ({ | |
...options, | |
payload, | |
}); | |
export default { | |
set, | |
add, | |
process, | |
info, | |
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const Queue = require("./async-queue") | |
const payload = { name: "guilherme", nickname: "motinha" } | |
const arrPayload = [] | |
let chaos = 1000; | |
const worker = async payload => { | |
console.log ("STARTED: ", payload.name) | |
await new Promise(resolve => { | |
setTimeout(() => { | |
console.log("FINISHED: ", payload.name) | |
resolve() | |
chaos = chaos - 100 | |
}, 2000 + chaos) | |
}) | |
chaos = chaos + 300 | |
} | |
Queue.set(worker, { concurrency: 10 }) | |
for(let i=0; i<30; i++) { | |
Queue.add({ | |
name: payload.name + '~' + i, | |
nickname: payload.nickname + '~' + i | |
}) | |
} | |
const init = async () => { | |
console.log("Started Queuing") | |
await Queue.process() | |
console.log("Finished Queuing") | |
} | |
init() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment