Skip to content

Instantly share code, notes, and snippets.

@markotom
Last active April 21, 2016 06:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save markotom/97f3d2a4f2d99705b2e7076b1a9a9800 to your computer and use it in GitHub Desktop.
Save markotom/97f3d2a4f2d99705b2e7076b1a9a9800 to your computer and use it in GitHub Desktop.
'use strict'
const assert = require('assert')
const EventEmmiter = require('events')
class SimpleQueue extends EventEmmiter {
constructor (options) {
super()
options = options || {}
assert(options.redis, 'should be specified a redis client')
this.prefix = options.prefix || 'q'
this.pattern = new RegExp(`^${this.prefix}\.(.+)`)
this.client = options.redis
this.on('message', (queue, message) => {
let name = queue.match(this.pattern)[1]
this.emit(name, JSON.parse(message))
})
}
push (queue, message) {
assert.equal(typeof queue, 'string', 'should be specified a queue name as string')
assert(message, 'message should be specified')
this.client.lpush(queue, JSON.stringify(message))
return this
}
pull (queue, callback) {
assert.equal(typeof queue, 'string', 'should be specified a queue name')
assert.equal(typeof callback, 'function', 'should be specified a callback')
this.on.apply(this, arguments)
return this
}
listen () {
assert(arguments.length > 0, 'should be specified at least one queue to listen')
let args = []
for (let a in arguments) {
args[a] = `${this.prefix}.${arguments[a]}`
}
let queues = [].slice.call(args, 0)
this.client.brpop(queues, 0, (err, replies) => {
try {
if (err) {
return this.emit('error', err)
}
if (replies.length !== 2) {
return this.emit('error', new Error('Bad replies length'))
}
this.emit('message', replies[0], replies[1])
} finally {
this.listen.apply(this, arguments)
}
})
return this
}
broadcast (prefixes, queue, message) {
assert(Array.isArray(prefixes), 'should be specified prefixes as array')
assert.equal(typeof queue, 'string', 'should be specified a queue name as string')
assert(message, 'should be specified a message')
for (let prefix of prefixes) {
this.push(`${prefix}.${queue}`, message)
}
return this
}
}
module.exports = SimpleQueue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment