Skip to content

Instantly share code, notes, and snippets.

@markotom
Last active April 22, 2016 02:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save markotom/5da7c54bb3fee3fb8dd23c159a1809f0 to your computer and use it in GitHub Desktop.
Save markotom/5da7c54bb3fee3fb8dd23c159a1809f0 to your computer and use it in GitHub Desktop.
'use strict'
const Bull = require('bull')
const co = require('co')
const EventEmitter = require('events')
class Queue extends EventEmitter {
constructor (options) {
super()
options = options || {}
this.redis = options.redis || {}
this.redis.port = this.redis.port || 6379
this.redis.host = this.redis.host || '127.0.0.1'
this.redis.options = this.redis.options || {}
this.queues = {}
}
pull (queueName, callback) {
let queue = this.queue(queueName)
if (!queue.handler) {
let ctx = this
queue.process(function (queue, done) {
co(function * () {
yield callback.call(ctx, queue)
})
.then(function (result) {
done(null, result || null)
})
.catch(done)
})
}
return this
}
push (queueName, message) {
let queue = this.queue(queueName)
queue.add(message)
return this
}
queue (queueName) {
if (!this.queues[queueName]) {
this.queues[queueName] = Bull(queueName, this.redis.port, this.redis.host, this.redis.options)
}
return this.queues[queueName]
}
}
module.exports = Queue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment