Last active
April 21, 2016 06:02
-
-
Save markotom/97f3d2a4f2d99705b2e7076b1a9a9800 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'use strict' | |
const assert = require('assert') | |
const EventEmmiter = require('events') | |
class SimpleQueue extends EventEmmiter { | |
constructor (options) { | |
super() | |
options = options || {} | |
assert(options.redis, 'should be specified a redis client') | |
this.prefix = options.prefix || 'q' | |
this.pattern = new RegExp(`^${this.prefix}\.(.+)`) | |
this.client = options.redis | |
this.on('message', (queue, message) => { | |
let name = queue.match(this.pattern)[1] | |
this.emit(name, JSON.parse(message)) | |
}) | |
} | |
push (queue, message) { | |
assert.equal(typeof queue, 'string', 'should be specified a queue name as string') | |
assert(message, 'message should be specified') | |
this.client.lpush(queue, JSON.stringify(message)) | |
return this | |
} | |
pull (queue, callback) { | |
assert.equal(typeof queue, 'string', 'should be specified a queue name') | |
assert.equal(typeof callback, 'function', 'should be specified a callback') | |
this.on.apply(this, arguments) | |
return this | |
} | |
listen () { | |
assert(arguments.length > 0, 'should be specified at least one queue to listen') | |
let args = [] | |
for (let a in arguments) { | |
args[a] = `${this.prefix}.${arguments[a]}` | |
} | |
let queues = [].slice.call(args, 0) | |
this.client.brpop(queues, 0, (err, replies) => { | |
try { | |
if (err) { | |
return this.emit('error', err) | |
} | |
if (replies.length !== 2) { | |
return this.emit('error', new Error('Bad replies length')) | |
} | |
this.emit('message', replies[0], replies[1]) | |
} finally { | |
this.listen.apply(this, arguments) | |
} | |
}) | |
return this | |
} | |
broadcast (prefixes, queue, message) { | |
assert(Array.isArray(prefixes), 'should be specified prefixes as array') | |
assert.equal(typeof queue, 'string', 'should be specified a queue name as string') | |
assert(message, 'should be specified a message') | |
for (let prefix of prefixes) { | |
this.push(`${prefix}.${queue}`, message) | |
} | |
return this | |
} | |
} | |
module.exports = SimpleQueue |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment