Skip to content

Instantly share code, notes, and snippets.

@markotom
Created April 19, 2016 21:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save markotom/4c25d25a6508b97b93558d499c0d1d9b to your computer and use it in GitHub Desktop.
Save markotom/4c25d25a6508b97b93558d499c0d1d9b to your computer and use it in GitHub Desktop.
'use strict'
const assert = require('assert')
const Kue = require('kue')
class Queue {
constructor (options) {
options = options || {}
this.prefix = options.prefix || 'store'
this.redis = options.redis || {}
this.kue = new Kue({
prefix: 'q',
redis: this.redis
})
}
push (queue, message, options) {
options = options || {}
assert.equal(typeof queue, 'string', 'should be specified a queue name')
assert(message, 'should be specified a queue message')
assert(options instanceof Object, 'options should be specified as object')
let q = this.kue.create(`${this.prefix}.${queue}`, message)
for (let option in options) {
let fn = q[option]
if (typeof fn === 'function') {
q = q[option](options[option])
}
}
return q.save()
}
pull (queue, action, options) {
assert.equal(typeof queue, 'string', 'should be specified a queue name')
action = typeof action === 'function' ? action : function noop () {}
options = options || {}
options.concurrency = options.concurrency || 1
options.pause = options.pause || 1000
this.kue.process(`${this.prefix}.${queue}`, options.concurrency, function (queue, ctx, next) {
ctx.pause(options.pause, function () {
action(queue, next)
ctx.resume()
})
})
return this
}
static broadcast (prefixes, queue, message, options) {
assert(Array.isArray(prefixes), 'prefixes should be specified as array')
let queues = {}
let qs = []
for (let prefix of prefixes) {
if (!queues[prefix]) {
queues[prefix] = new Queue({ prefix: prefix })
}
let q = queues[prefix].push(queue, message, options)
qs.push(q)
}
return qs
}
}
module.exports = Queue
'use strict'
const STORE_PREFIX = process.env.STORE_PREFIX || 'store'
const Queue = require('./queue')
const queue = new Queue({ prefix: STORE_PREFIX })
queue.pull('user.create', function (queue, next) {
console.log(queue.data)
next(null, {
prefix: STORE_PREFIX,
queue: 'user.create',
model: queue.data
})
}, { concurrency: 10 })
'use strict'
const Queue = require('./queue')
const faker = require('faker')
setInterval(function () {
let prefixes = ['store1', 'store2']
Queue.broadcast(prefixes, 'user.create', {
firstname: faker.name.firstName(),
lastname: faker.name.lastName()
}, {
removeOnComplete: true
})
}, 500)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment