Skip to content

Instantly share code, notes, and snippets.

@guilhermebkel
Last active January 9, 2020 10:00
Show Gist options
  • Save guilhermebkel/44ba77ec5a0f17456093f7da79e01346 to your computer and use it in GitHub Desktop.
Save guilhermebkel/44ba77ec5a0f17456093f7da79e01346 to your computer and use it in GitHub Desktop.
async-queue
/*
Service to make async process of any payload array, with concurrency support.
Author: Guilherme Mota Bromonschenkel Lima
Github: guilhermebkel
*/
// Milliseconds between checks to look for some available spot on the queue.
const REFRESH_TIME = 100;
let worker = () => {};
let options = {
concurrency: 1,
retries: 0,
delay: 0,
};
let payload = [];
let enqueued = 0;
const delay = async (milliseconds) => {
return await new Promise((callback) => {
setTimeout(() => {
callback();
}, milliseconds);
});
};
const set = (_worker, _options = {}) => {
if (typeof worker !== "function") {
throw new Error("Worker must be a function");
} else if (typeof options !== "object") {
throw new Error("Options must be an object");
}
worker = _worker
options = {
...options,
..._options,
}
};
const add = (_payload) => {
if (typeof payload === "undefined" || payload === null || payload === undefined) {
throw new Error("Payload can not be undefined or null");
}
// Recognizes if the given payload is an array in order to
// decide how the payload variable will be like.
if (Array.isArray(_payload)) {
payload = _payload;
} else {
payload.push(_payload);
}
};
const initWorker = async () => {
// We take the first element of payload array and start processing it.
// Tip: We're doing it by [0] because of the lack of performance of .shift()
const payloadOnProcess = payload[0];
// Removes the payload that is about to be processed from the list
payload = payload.filter(payload => payload.metadata.jobId !== payloadOnProcess.metadata.jobId);
enqueued++;
try {
// In case the maximum retries have not been reached, we start the worker
if (payloadOnProcess.metadata.retries <= options.retries) {
await worker(payloadOnProcess);
}
} catch(error) {
console.warn(error);
// In case of error, if there's a retry option enabled we add the payload back
// to queue to start processing it again.
if (options.retries && payloadOnProcess.metadata.retries < options.retries) {
payloadOnProcess.metadata.retries++;
payloadOnProcess.metadata.error = error;
add(payloadOnProcess);
await delay(options.delay);
}
}
enqueued--;
};
const setMetadata = () => {
for (let i=0; i<payload.length; i++) {
payload[i].metadata = {
jobId: i,
error: null,
retries: 0,
};
}
};
const process = async () => {
if (!payload.length) {
throw new Error("Payload not supplied")
}
setMetadata();
await new Promise(callback => {
const checkQueue = setInterval(() => {
// If the amount of enqueued jobs is less than the limit of queue concurrency
// and the payload has more than one item, we start processing it.
if (enqueued < options.concurrency && payload.length > 0) {
initWorker();
} else if (enqueued === 0) {
clearInterval(checkQueue);
callback();
}
}, REFRESH_TIME);
})
};
const info = () => ({
...options,
payload,
});
export default {
set,
add,
process,
info,
};
const Queue = require("./async-queue")
const payload = { name: "guilherme", nickname: "motinha" }
const arrPayload = []
let chaos = 1000;
const worker = async payload => {
console.log ("STARTED: ", payload.name)
await new Promise(resolve => {
setTimeout(() => {
console.log("FINISHED: ", payload.name)
resolve()
chaos = chaos - 100
}, 2000 + chaos)
})
chaos = chaos + 300
}
Queue.set(worker, { concurrency: 10 })
for(let i=0; i<30; i++) {
Queue.add({
name: payload.name + '~' + i,
nickname: payload.nickname + '~' + i
})
}
const init = async () => {
console.log("Started Queuing")
await Queue.process()
console.log("Finished Queuing")
}
init()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment