Skip to content

Instantly share code, notes, and snippets.

@samsonjs
Created May 25, 2011 06:47
Show Gist options
  • Save samsonjs/990471 to your computer and use it in GitHub Desktop.
Save samsonjs/990471 to your computer and use it in GitHub Desktop.
Basic rate limiting queue for node
//// Usage
var queue = createMessageQueue({ messagesPerMinute: 60 })
queue.on('message', function(msg) {
console.log('message: ' + JSON.stringify(msg, null, 2))
})
queue.enqueue({ to: 'avian', from: 'sjs', body: 'cool story bro' })
//// Implementation
// The simplest thing I could think of. A rolling tally of # messages sent
// each second, spanning one minute total, stored in a queue. Every second
// we dequeue the oldest count and push a count of 0 which is then updated
// as messages come in. The total in the queue at any given time must be
// less than the specified maximum number of messages to send per minute.
// The default is a maximum of 20 messages per minute.
//
// When that limit is reached messages will be queued until the queue
// reaches the maximum specified size, after which messages are ignored. By
// default up to 10 messages will be queued.
//
// There is a volume knob that goes from 0 to 10 where 0 is mute and 10 is
// maximum chattiness. You can retrieve and set the volume so an external
// algorithm could adjust the volume as necessary.
var EventEmitter = require('events').EventEmitter
// a couple of utils
function mixin(a, b) {
for (var k in b) a[k] = b[k]
}
function clamp(n, min, max) {
if (n > max) return max
if (n < min) return min
return n
}
// the good stuff
var MaxVolume = 10
, DefaultOptions = { messagesPerMinute: 20 // send at most 1 message every N minutes
, queueSize: 10 // queue up to N messages before ignoring new ones
}
function createMessageQueue(options) {
options = mixin(options || {}, DefaultOptions)
var absoluteMaxMessagesPerMinute = options.messagesPerMinute
, maxQueueSize = options.queueSize
, currentMaxMessagesPerMinute = absoluteMaxMessagesPerMinute
, messagesSentThisMinute = 0
, messagesSentEachSecond = [0] // flows left -> right. unshift is enqueue, pop is dequeue.
, queue = []
, limited = false
, volume = MaxVolume
, preMuteVolume = volume // used to restore the volume on unmute
function consume() {
if (self.isEmpty()) return
self.emit('message', queue.pop())
messagesSentEachSecond[0] += 1
messagesSentThisMinute += 1
limit()
}
function limit() {
currentMaxMessagesPerMinute = Math.round((volume / MaxVolume) * absoluteMaxMessagesPerMinute)
limited = messagesSentThisMinute >= currentMaxMessagesPerMinute
if (!limited) consume()
}
var self = {
enqueue: function(msg) {
if (self.isFull()) return false
queue.push(msg)
if (!limited) consume()
return true
}
, isEmpty: function() {
return queue.length === 0
}
, isFull: function() {
return queue.length >= maxQueueSize
}
, mute: function() {
if (volume > 0) {
preMuteVolume = volume
volume = 0
}
}
, start: function() {
if (self.timeout) throw 'queue already started'
limited = false
volume = 10
messagesSentThisMinute = 0
messagesSentEachSecond = [0]
self.timeout = setTimeout(function() {
if (messagesSentEachSecond.length === 60) {
messagesSentThisMinute -= messagesSentEachSecond.pop()
}
messagesSentEachSecond.unshift(0)
limit()
}, 1000)
}
, stop: function() {
if (!self.timeout) return
clearTimeout(self.timeout)
delete self.timeout
queue = []
messagesSentEachSecond = null
}
, volume: function(newVolume) {
if (typeof newVolume !== 'undefined') {
var n = +newVolume
if (typeof n !== 'number' || isNaN(n)) throw 'volume does not go to 11'
volume = clamp(n, 0, MaxVolume)
}
else {
return volume
}
}
, unmute: function() {
if (volume === 0) volume = preMuteVolume
}
}
EventEmitter.call(self)
self.start()
return self
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment