Skip to content

Instantly share code, notes, and snippets.

@rkok
Created August 14, 2023 04:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rkok/67e2d59c6bfbee85df88d0d76344dc66 to your computer and use it in GitHub Desktop.
Save rkok/67e2d59c6bfbee85df88d0d76344dc66 to your computer and use it in GitHub Desktop.
Rate-limit-aware, multi-interface wrapper for Axios (HTTPS requests only)
/*
* Disclaimer: this was slung together as the result of some trial and error and can definitely be improved.
*
* Pass environment variable DEBUG=1 for lots of debug logging.
*
* Usage example:
*
* const rateLimits: RateLimitDef[] = [
* {
* urlRegex: 'carriages/[^/]+/horses',
* reqsPerMinute: 10
* }
* ];
*
* const client = new MultiHttps({
* rateLimits,
* interfaces: ['1.2.3.4', '2.3.4.5']
* });
*
* await Promise.all([1,2,3,4].map(n => client.enqueue({ url: `https://example.com/carriages/${n}/horses` })));
*
*/
import axios, {AxiosResponse, AxiosRequestConfig, AxiosInstance} from "axios";
import chalk from "chalk";
import moment from "moment";
import {v4 as uuidv4} from 'uuid';
import logger from "../logger";
const https = require('https');
// Source: https://github.com/axios/axios/issues/2808#issuecomment-800141018
const getWrappedHttpsTransport = ({localAddress}: { localAddress: string }): any => ({
...https,
request: (options: any, callback: any) => https.request({
...options,
...(localAddress && {
localAddress,
family: localAddress.includes(':') ? 6 : 4,
}),
}, callback),
});
export type RateLimitDef = {
urlRegex: string,
reqsPerMinute: number,
};
type Thread = {
interface: string,
client: AxiosInstance,
status: 'backoff' | 'running' | 'waiting' | 'free',
nextAvailableTime: number
}
type RequestConfig<D> = AxiosRequestConfig<D> & {
backoffSecs?: number,
retries?: number
}
const threads: { [Key in string]: Thread[] } = {};
/** Rate-limit-aware, multi-interface wrapper for Axios (HTTPS requests only) */
class MultiHttps {
// Keep interfaces for debugging purposes only
instanceId: string;
rateLimits: RateLimitDef[] = [];
queue: ((threadIndex: number) => Promise<any>)[] = [];
queueSemaphore = require('semaphore')(1);
constructor({rateLimits, interfaces = ['default']}: {
rateLimits: RateLimitDef[],
interfaces: string[]
}) {
this.instanceId = uuidv4();
this.rateLimits = rateLimits;
threads[this.instanceId] = interfaces.map(localAddress => {
const clientOpts = localAddress === 'default' ? {} : {
// @ts-ignore See https://github.com/axios/axios/issues/5431#issue-1520478553
transport: getWrappedHttpsTransport({localAddress})
} as AxiosRequestConfig;
return {
interface: localAddress,
client: axios.create(clientOpts),
status: 'free',
nextAvailableTime: 0
}
});
this.watchQueue();
}
async watchQueue() {
while (true) {
await new Promise((resolve) => {
this.queueSemaphore.take(() => {
const threadIndex = this.getNextAvailableThreadIndex();
if (threadIndex !== undefined) {
const callback = this.queue.shift();
if (callback) {
if (process.env.DEBUG) console.debug(Date.now(), `found free thread ${threadIndex}, pushing next job`);
callback(threadIndex)
.catch(e => { /* Do nothing; outside listeners will handle it */
});
}
}
this.queueSemaphore.leave();
setTimeout(resolve, 5);
});
});
}
}
async enqueue<T = any, R = AxiosResponse<T>, D = any>(config: RequestConfig<D>): Promise<R> {
if (process.env.DEBUG) console.debug(Date.now(), `enqueueing`);
return new Promise((resolve, reject) => {
this.queue.push((threadIdx) => {
return this.request<T, R, D>(config, threadIdx)
.then(resolve)
.catch(e => reject(e));
});
});
}
async request<T = any, R = AxiosResponse<T>, D = any>(config: AxiosRequestConfig<D> & {
backoffSecs?: number,
retries?: number
}, threadIndex: number): Promise<R> {
threads[this.instanceId][threadIndex].status = 'running';
const thread = threads[this.instanceId][threadIndex];
const backoffSecs = config?.backoffSecs ?? 60;
const retries = config?.retries ?? 3;
const now = Date.now();
const nextTime = thread.nextAvailableTime;
const url = (config.baseURL ?? '') + (config?.url ?? '');
const distancingMs = this.getMinMsBetweenRequests(url);
const waitMs = (now < nextTime) ? nextTime - now : 0;
if (process.env.DEBUG) {
this.dumpStatus(threadIndex, url);
console.debug({penaltyMs: distancingMs, now, nextTime, waitMs})
}
threads[this.instanceId][threadIndex].nextAvailableTime = Math.max(nextTime, now + waitMs) + distancingMs;
const addr = this.getInterfaceAddrPretty(threadIndex);
const run = async <R>() => {
if (process.env.DEBUG) console.debug(Date.now(), `${url} starting`);
return new Promise<R>(async (resolve, reject) => {
let resolved = false;
for (let attempt = 1; attempt <= retries && !resolved; attempt++) {
let wait = false;
await thread.client.request<T, R, D>(config)
.then((res) => {
resolved = true;
return resolve(res)
})
.catch(async (e: any) => {
if (process.env.DEBUG) console.debug(Date.now(), `caught err ${e.message}`);
if (axios.isAxiosError(e)) {
const attemptPrefix = `[Attempt ${attempt}/${retries}] ${addr}`;
if (e.response?.status === 429) {
logger.warn(`${attemptPrefix} Request throttled by API, backing off for ${backoffSecs}s`, url);
wait = true;
} else if (e.code === 'ECONNABORTED' && e.message.includes('timeout')) {
logger.warn(`${attemptPrefix} Request timed out, retrying`);
} else {
return reject(e);
}
} else {
return reject(e);
}
})
.finally(() => {
if (process.env.DEBUG) console.debug(Date.now(), `${url} response received`);
});
if (wait) {
threads[this.instanceId][threadIndex].status = 'backoff';
await new Promise(resolve => setTimeout(resolve, backoffSecs * 1000));
threads[this.instanceId][threadIndex].status = 'running';
}
}
return reject(new Error(`Giving up on API call after ${retries} attempts`));
});
};
return new Promise<R>((resolve) => {
if (waitMs > 0) {
threads[this.instanceId][threadIndex].status = 'waiting';
setTimeout(() => {
threads[this.instanceId][threadIndex].status = 'running';
return resolve(run())
}, waitMs);
} else {
return resolve(run());
}
}).finally(() => {
if (process.env.DEBUG) console.debug('promise finally!');
threads[this.instanceId][threadIndex].status = 'free';
});
}
getMinMsBetweenRequests(url: string): number {
for (let rl of this.rateLimits) {
if (new RegExp(rl.urlRegex).exec(url)) {
if (process.env.DEBUG) console.debug(Date.now(), `setting rate limit to ${rl.reqsPerMinute} req/minute for ${url}`);
return 60000 / rl.reqsPerMinute;
}
}
return 0;
}
getNextAvailableThreadIndex(): number | undefined {
const freeThreads = threads[this.instanceId].filter(t => t.status === 'free');
if (!freeThreads.length) {
return undefined;
}
const soonestTime = Math.min(...freeThreads.map(t => t.nextAvailableTime));
return threads[this.instanceId].findIndex(t => t.nextAvailableTime === soonestTime)!;
}
getInterfaceAddrPretty(idx: number): string {
let addr = threads[this.instanceId][idx].interface;
switch (threads[this.instanceId][idx].status) {
case 'free':
addr = chalk.green(addr);
break;
case 'waiting':
addr = chalk.yellowBright(addr);
break;
case 'running':
addr = chalk.yellow(addr);
break;
case 'backoff':
addr = chalk.red(addr);
break;
}
return addr;
}
dumpStatus(chosenIdx: number, url: string) {
let output = '';
threads[this.instanceId].forEach((_, i) => {
const addr = this.getInterfaceAddrPretty(i);
const addrFull = `${addr} N:${moment(threads[this.instanceId][i].nextAvailableTime).format('HH:mm:ss')}`
if (i === chosenIdx) {
output += chalk.bold(addrFull) + ` <-- ${url}\n`;
} else {
output += `${addrFull}\n`
}
});
process.stdout.write(`-----\n${output}-----\n\n`);
}
}
export default MultiHttps;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment