Skip to content

Instantly share code, notes, and snippets.

@the-spyke
Last active December 13, 2017 08:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save the-spyke/f41371ea9c20d4ab593b73ba284b0a90 to your computer and use it in GitHub Desktop.
Save the-spyke/f41371ea9c20d4ab593b73ba284b0a90 to your computer and use it in GitHub Desktop.
Scheduler with concurrency limit and execution throttling
'use strict';
const https = require('https');
/**
* Create a `schedule` function to shcedule tasks within the specified limits
* @param {Number} [concurrency=2] - Concurrency limit
* @param {Number} [throughput=3] - Throughput limit (tasks per second, tps)
* @returns {Function} - The function to schedule tasks, returns a promise
*/
function createScheduler(concurrency = 2, throughput = 3) {
console.log(`Scheduler: concurrency limit = ${concurrency}, throughput limit = ${throughput} tasks/sec`);
// Scheduled tasks queue in current throttle window or still active
// This way we will know if we have reached the limit of tasks per second
const tasks = [];
// Queued tasks
const queue = [];
// Task id counter. Task ID is used only for console logging
let id = 1;
// Current active tasks count
let activeTasks = 0;
// A primitive way to get current Unix time in milliseconds
function now() {
return new Date().valueOf();
}
// Just for logging
function log(message) {
// Align the output
const padding = ' '.repeat(30 - message.length);
console.log(`${message}${padding} (active ${activeTasks}/${concurrency}, tps ${tasks.length}/${throughput}, queued ${queue.length})`);
}
function isWithinLimits() {
return activeTasks < concurrency && tasks.length < throughput;
}
// Removes tasks older than 1 sec from the list
function purgeOldTasks(time) {
// If there are tasks and the first one is older than 1 second
while (tasks.length > 0 && time - tasks[0].time > 1000) {
tasks.shift();
}
}
// Called at the end of throttle period. Purges outdated tasks and starts as many new tasks as it can
function endThrottle() {
purgeOldTasks(now());
log(`...done`);
startTasks();
}
// Completes task execution and starts next task
function completeTask(task) {
const time = now();
activeTasks -= 1;
purgeOldTasks(time);
log(`Completed #${task.id}`);
// If there are tasks in the queue, we need to start them
if (queue.length > 0) {
// If we haven't reached out limits yet
if (isWithinLimits()) {
startTasks();
// If we have reached the throughput limit and this task was the last one active (no more tasks are running)
// We need to wait some time till we can start tasks again
} else if (activeTasks === 0) {
// Time the first task started + 1 second - current time
const timeToEndThrottle = tasks[0].time + 1000 - time;
log(`Throttling for ${timeToEndThrottle}ms...`);
// Wait
setTimeout(endThrottle, timeToEndThrottle);
}
} else if (activeTasks === 0) {
log(`Queue is empty`);
}
}
// Starts as many task as we have free resources before we reach limits
function startTasks() {
while (queue.length > 0 && isWithinLimits()) {
const task = queue.shift();
startTask(task);
log(`Dequeued and started #${task.id}`);
}
}
// Starts the task
function startTask(task) {
activeTasks += 1;
// Task's start time
task.time = now();
tasks.push(task);
// Runs and wait for result
return task.action()
// This is how we know if task is completed
.then(result => {
completeTask(task);
// It the task was queued we need to resolve the promise and no need to return
if (task.resolve) {
task.resolve(result);
} else {
return result;
}
}).catch(error => {
// Log the error just for debugging
console.log(`Error in the task #${task.id}:`);
console.error(error);
completeTask(task);
// It the task was queued we need to reject the promise and no need to return
if (task.reject) {
task.reject(error);
} else {
return Promise.reject(error);
}
});
}
// Adds task to the queue, creates a promise prematurely so caller could chain to it right now.
function enqueueTask(task) {
queue.push(task);
return new Promise((res, rej) => {
// Save handlers for later
task.resolve = res;
task.reject = rej;
});
}
// Wrap original action so you can schedule synchronous code.
// And catch synchronous errors in actions too.
function wrapAction(action) {
return function actionWrapper() {
try {
return Promise.resolve(action());
} catch (error) {
return Promise.reject(error);
}
}
}
/**
* Schedules an action for execution
* @param {Function} taskAction
* @returns {Promise} - The promise of the task action's result
*/
return function schedule(taskAction) {
const task = {
id: id++,
action: wrapAction(taskAction)
};
if (activeTasks === 0 && queue.length === 0) {
purgeOldTasks(now());
}
let promise;
if (isWithinLimits()) {
promise = startTask(task);
log(`Scheduling #${task.id}... started`);
} else {
promise = enqueueTask(task);
log(`Scheduling #${task.id}... queued`);
}
return promise;
};
}
/*******************************************************************/
// An example of an API endpoint
function getUserById(userId) {
return new Promise((resolve, reject) => {
https.get(`https://reqres.in/api/users/${userId}`, resp => {
let data = '';
resp.on('data', chunk => data += chunk);
resp.on('end', () => {
try {
resolve(JSON.parse(data));
} catch (error) {
reject(error);
}
});
}).on('error', reject);
});
}
/*******************************************************************/
// Usage example
// Create a scheduler and an array of promises
const schedule = createScheduler(2, 3);
const promises = [];
// Schedules {count} requests for users
function requestUsers(count) {
for (let i = 0; i < count; i++) {
// Generate a random user ID
const userId = 1 + Math.floor(Math.random() * 10);
// Schedule an API call and same the promise
const promise = schedule(() => getUserById(userId));
promises.push(promise);
}
}
// Schedule 10 requests for users right from the start
requestUsers(10);
// Then add 5 more requests for users after 1 second from the start
setTimeout(() => {
requestUsers(5);
}, 1000)
// Then add 2 more requests for users after 9 seconds from the start
setTimeout(() => {
requestUsers(2);
// Wait till all scheduled requests are completed
Promise.all(promises)
// Print the results
.then(results => {
console.log('Results:');
console.log(JSON.stringify(results, null, 4));
})
// Print errors
.catch(error => {
console.error(error);
})
.then(results => {
console.log('Done');
});
}, 9000)
@eng1n88r
Copy link

OMG!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment