Last active
December 13, 2017 08:38
-
-
Save the-spyke/f41371ea9c20d4ab593b73ba284b0a90 to your computer and use it in GitHub Desktop.
Scheduler with concurrency limit and execution throttling
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'use strict'; | |
const https = require('https'); | |
/** | |
* Create a `schedule` function to shcedule tasks within the specified limits | |
* @param {Number} [concurrency=2] - Concurrency limit | |
* @param {Number} [throughput=3] - Throughput limit (tasks per second, tps) | |
* @returns {Function} - The function to schedule tasks, returns a promise | |
*/ | |
function createScheduler(concurrency = 2, throughput = 3) { | |
console.log(`Scheduler: concurrency limit = ${concurrency}, throughput limit = ${throughput} tasks/sec`); | |
// Scheduled tasks queue in current throttle window or still active | |
// This way we will know if we have reached the limit of tasks per second | |
const tasks = []; | |
// Queued tasks | |
const queue = []; | |
// Task id counter. Task ID is used only for console logging | |
let id = 1; | |
// Current active tasks count | |
let activeTasks = 0; | |
// A primitive way to get current Unix time in milliseconds | |
function now() { | |
return new Date().valueOf(); | |
} | |
// Just for logging | |
function log(message) { | |
// Align the output | |
const padding = ' '.repeat(30 - message.length); | |
console.log(`${message}${padding} (active ${activeTasks}/${concurrency}, tps ${tasks.length}/${throughput}, queued ${queue.length})`); | |
} | |
function isWithinLimits() { | |
return activeTasks < concurrency && tasks.length < throughput; | |
} | |
// Removes tasks older than 1 sec from the list | |
function purgeOldTasks(time) { | |
// If there are tasks and the first one is older than 1 second | |
while (tasks.length > 0 && time - tasks[0].time > 1000) { | |
tasks.shift(); | |
} | |
} | |
// Called at the end of throttle period. Purges outdated tasks and starts as many new tasks as it can | |
function endThrottle() { | |
purgeOldTasks(now()); | |
log(`...done`); | |
startTasks(); | |
} | |
// Completes task execution and starts next task | |
function completeTask(task) { | |
const time = now(); | |
activeTasks -= 1; | |
purgeOldTasks(time); | |
log(`Completed #${task.id}`); | |
// If there are tasks in the queue, we need to start them | |
if (queue.length > 0) { | |
// If we haven't reached out limits yet | |
if (isWithinLimits()) { | |
startTasks(); | |
// If we have reached the throughput limit and this task was the last one active (no more tasks are running) | |
// We need to wait some time till we can start tasks again | |
} else if (activeTasks === 0) { | |
// Time the first task started + 1 second - current time | |
const timeToEndThrottle = tasks[0].time + 1000 - time; | |
log(`Throttling for ${timeToEndThrottle}ms...`); | |
// Wait | |
setTimeout(endThrottle, timeToEndThrottle); | |
} | |
} else if (activeTasks === 0) { | |
log(`Queue is empty`); | |
} | |
} | |
// Starts as many task as we have free resources before we reach limits | |
function startTasks() { | |
while (queue.length > 0 && isWithinLimits()) { | |
const task = queue.shift(); | |
startTask(task); | |
log(`Dequeued and started #${task.id}`); | |
} | |
} | |
// Starts the task | |
function startTask(task) { | |
activeTasks += 1; | |
// Task's start time | |
task.time = now(); | |
tasks.push(task); | |
// Runs and wait for result | |
return task.action() | |
// This is how we know if task is completed | |
.then(result => { | |
completeTask(task); | |
// It the task was queued we need to resolve the promise and no need to return | |
if (task.resolve) { | |
task.resolve(result); | |
} else { | |
return result; | |
} | |
}).catch(error => { | |
// Log the error just for debugging | |
console.log(`Error in the task #${task.id}:`); | |
console.error(error); | |
completeTask(task); | |
// It the task was queued we need to reject the promise and no need to return | |
if (task.reject) { | |
task.reject(error); | |
} else { | |
return Promise.reject(error); | |
} | |
}); | |
} | |
// Adds task to the queue, creates a promise prematurely so caller could chain to it right now. | |
function enqueueTask(task) { | |
queue.push(task); | |
return new Promise((res, rej) => { | |
// Save handlers for later | |
task.resolve = res; | |
task.reject = rej; | |
}); | |
} | |
// Wrap original action so you can schedule synchronous code. | |
// And catch synchronous errors in actions too. | |
function wrapAction(action) { | |
return function actionWrapper() { | |
try { | |
return Promise.resolve(action()); | |
} catch (error) { | |
return Promise.reject(error); | |
} | |
} | |
} | |
/** | |
* Schedules an action for execution | |
* @param {Function} taskAction | |
* @returns {Promise} - The promise of the task action's result | |
*/ | |
return function schedule(taskAction) { | |
const task = { | |
id: id++, | |
action: wrapAction(taskAction) | |
}; | |
if (activeTasks === 0 && queue.length === 0) { | |
purgeOldTasks(now()); | |
} | |
let promise; | |
if (isWithinLimits()) { | |
promise = startTask(task); | |
log(`Scheduling #${task.id}... started`); | |
} else { | |
promise = enqueueTask(task); | |
log(`Scheduling #${task.id}... queued`); | |
} | |
return promise; | |
}; | |
} | |
/*******************************************************************/ | |
// An example of an API endpoint | |
function getUserById(userId) { | |
return new Promise((resolve, reject) => { | |
https.get(`https://reqres.in/api/users/${userId}`, resp => { | |
let data = ''; | |
resp.on('data', chunk => data += chunk); | |
resp.on('end', () => { | |
try { | |
resolve(JSON.parse(data)); | |
} catch (error) { | |
reject(error); | |
} | |
}); | |
}).on('error', reject); | |
}); | |
} | |
/*******************************************************************/ | |
// Usage example | |
// Create a scheduler and an array of promises | |
const schedule = createScheduler(2, 3); | |
const promises = []; | |
// Schedules {count} requests for users | |
function requestUsers(count) { | |
for (let i = 0; i < count; i++) { | |
// Generate a random user ID | |
const userId = 1 + Math.floor(Math.random() * 10); | |
// Schedule an API call and same the promise | |
const promise = schedule(() => getUserById(userId)); | |
promises.push(promise); | |
} | |
} | |
// Schedule 10 requests for users right from the start | |
requestUsers(10); | |
// Then add 5 more requests for users after 1 second from the start | |
setTimeout(() => { | |
requestUsers(5); | |
}, 1000) | |
// Then add 2 more requests for users after 9 seconds from the start | |
setTimeout(() => { | |
requestUsers(2); | |
// Wait till all scheduled requests are completed | |
Promise.all(promises) | |
// Print the results | |
.then(results => { | |
console.log('Results:'); | |
console.log(JSON.stringify(results, null, 4)); | |
}) | |
// Print errors | |
.catch(error => { | |
console.error(error); | |
}) | |
.then(results => { | |
console.log('Done'); | |
}); | |
}, 9000) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
OMG!