Skip to content

Instantly share code, notes, and snippets.

@radzhome
Created Apr 16, 2019
Embed
What would you like to do?
redis pub sub
"""
Pub/sub is a pattern used in scaling software its important to understand what sort of
scaling it helps with. An important distinction is between pub/sub and message queueing.
In the queueing pattern, a queue (or list in Redis) buffers the messages to be processed while
a pool of workers pops items off the list and handles them. In this model, scaling the size of
your pool of workers scales the speed with which you process your queue, because each message
is passed to only one worker. All of the workers handle any given message in the exact same way.
In pub/sub on the other hand, the system attempts to deliver all of a channels messages to all
of its subscribers. Its a many-to-many pattern, where each of the different subscribers do something
unique with the message one writes it to a durable log, one sends it to a Slack channel one rings
a bell in a local sales office, etc.
In short, pub/sub scales message delivery, and queueing scales message workload processing.
Redis is frequently used for both of these goals.
https://www.redisgreen.net/blog/pubsub-intro/
"""
from multiprocessing import Process
import time
import redis
# Publisher
def pub(kache):
for n in range(10):
kache.publish('channel1', 'blah {}'.format(n))
time.sleep(5)
def pub2(kache):
for n in range(10):
kache.publish('channel2', 'blah {}'.format(n))
time.sleep(5)
# Some reader
def sub(kache, name):
pubsub = kache.pubsub()
pubsub.subscribe(['channel1'])
for item in pubsub.listen():
print("reader: {}, data: {}".format(name, item['data']))
# Some other reader
def sub2(kache, name):
pubsub = kache.pubsub()
pubsub.subscribe(['channel2'])
for item in pubsub.listen():
print("reader: {}, data: {}".format(name, item['data']))
if __name__ == '__main__':
redis_kache = redis.Redis()
Process(target=pub, args=(redis_kache, )).start()
Process(target=pub2, args=(redis_kache, )).start()
Process(target=sub, args=(redis_kache, 'reader1')).start()
Process(target=sub2, args=(redis_kache, 'reader2')).start()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment