Skip to content

Instantly share code, notes, and snippets.

@WietseWind WietseWind/pub.js
Last active Apr 9, 2019

Embed
What would you like to do?
Redis + Node PubSub vs. Task Publisher / Task Runner
// PubSub example, check pub.js and sub.js
// This is the publising of messages
// Messages will only be seen by subscribers that are already listening
// Credits: https://thisdavej.com/guides/redis-node/node/pubsub.html
const Redis = require('ioredis')
const pub = new Redis()
const channel = 'mychannel:123'
console.log('Started publisher...')
let i = 0
setInterval(() => {
i++
pub.publish(channel, JSON.stringify({ iterator: i, message: 'Hi' }))
}, 1000)
// PubSub example, check pub.js and sub.js
// This is the subscribing to messages
// Credits: https://thisdavej.com/guides/redis-node/node/pubsub.html
const Redis = require('ioredis')
const redis = new Redis()
const channel = 'garage:*'
// Use on('message'), (channel, message) for exact channel match,
// use on('pmessage'), (sub, channel, message) for wildcard channel match
redis.on('pmessage', (sub, channel, message) => {
console.log(`Received the following message from ${sub}, ${channel}`, message);
})
const handler = (error, count) => {
if (error) throw new Error(error)
console.log(`Subscribed to ${count} channel. Listening for updates on the ${channel} channel.`)
}
// Use subscribe for a exact channel match subscription
// Use psubscribe for a wildcard subscription
redis.psubscribe(channel, handler)
// You can unsubscribe with:
// redis.unsubscribe(channel)
// Or in case of a wildcard subscription:
// redis.punsubscribe(channel)
const Redis = require('ioredis')
const pub = new Redis()
let i = 0
setInterval(() => {
i++
pub.rpush('somequeue', 'jobdetails_' + i)
}, 1000)
// Get queue length: pub.llen('somequeue').then(...)
// Get queue item value: pub.lindex('somequeue', index || 0).then(...)
const Redis = require('ioredis')
const tasks = new Redis()
const getSomethingToDo = () => {
// blplop = 'Blocking plop', so it will wait until
// a message comes in. Don't setInterval this,
// invoking one after processing the last job
// is enough: it will trigger automatically
// when a new job comes in.
tasks.blpop('somequeue', 0).then(d => {
console.log('somequeue item:', d)
// Wait for the next message
getSomethingToDo()
})
}
getSomethingToDo()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.