Skip to content

Instantly share code, notes, and snippets.

@val314159
Forked from quiver/README.md
Created December 9, 2018 19:21
Show Gist options
  • Save val314159/bef76345505ca787bebc29af7f702e76 to your computer and use it in GitHub Desktop.
Save val314159/bef76345505ca787bebc29af7f702e76 to your computer and use it in GitHub Desktop.
Who says PostgreSQL can't Pub/Sub like Redis?

Pub/Sub pattern with PostgreSQL's LISTEN/NOTIFY command

This is a simple chat-like program using pub-sub pattern, backed by PostgreSQL's LISTEN/NOTIFY command.

Publish

publish message to foo channel from user nickname.

$ python pub.py foo nickname
PUBLISH to channel #foo
test message
what's up?
<Ctrl-D>
$

publish message to bar channel from user mike.

$ python pub.py bar mike
PUBLISH to channel #bar
pub/sub rules!
<Ctrl-D>
$

Subscribe

subscribe to foo or bar channel.

$ python sub.py foo bar
SUBSCRIBE TO channel #foo
SUBSCRIBE TO channel #bar
#foo - [nickname]:test message
#foo - [nickname]:what's up?
#bar - [mike]:pub/sub rules!
<Ctrl-C>
UNSUBSCRIBE FROM channel #foo
UNSUBSCRIBE FROM channel #bar
$
*.pyc
__pycache__
# -*- coding: utf-8 -*-
"""
# publisher of pub/sub pattern
"""
import os, sys, types, psycopg2
is_str=lambda s:s in types.StringTypes
wopen=(lambda f:open(f,'w') if is_str(f) else f or sys.stdout)
ropen=(lambda f:open(f,'r') if is_str(f) else f or sys.stdin)
__version__ = "1.0.0.1"
DSN = os.getenv("DSN", "dbname=postgres")
conn = psycopg2.connect(DSN)
conn.set_isolation_level(
psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
curs = conn.cursor()
def pub(channel, nickname, message):
"""
publish <message> to <channel> as <nickname>
"""
import psycopg2.extensions
chat_message = psycopg2.extensions.QuotedString(
'[%s]:%s' % (nickname, message)).getquoted()
curs.execute("NOTIFY %s, %s;" % (channel, chat_message))
def sub(channel):
"""
subscribe to <channel>
"""
print('SUBSCRIBE TO channel #%s' % channel)
curs.execute("LISTEN %s;" % channel)
def uns(channel):
"""
unsubscribe from <channel>
"""
print('UNSUBSCRIBE FROM channel #%s' % channel)
curs.execute("UNLISTEN %s;" % channel)
def notifications():
"""
iterator for notifications
"""
conn.poll()
while conn.notifies:
yield conn.notifies.pop()
def subs_loop(channels, callback_func):
"""
subscribe to <channels> and send notifies to <callback_func>
"""
[ sub(channel) for channel in channels ]
while True:
for n in notifications():
callback_func(n)
def tap(channels, out_file_spec=None):
"""
tap <channels> and send to out_file
"""
out_file = wopen(out_file_spec)
def cb(n):
s = "[%s] #%s - %s"%(n.pid, n.channel, n.payload)
out_file.write(s)
return subs_loop(channels, cb)
def drain(channel, nickname, in_file_spec=None):
"""
drain (publish in_file) as <nickname> to <channel>
"""
print("PUBLISH to channel #%s" % channel)
in_file = ropen(in_file_spec)
for line in in_file:
pub(channel, nickname, line)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
publisher of pub/sub pattern
Usage:
python -m ps.drain <channel> <nickname>
"""
import sys, ps
if __name__=='__main__': ps.drain(*sys.argv[1:])
clean::
find . -name \*~ -o -name .\*~ | xargs rm -f
find . -name \*.pyc | xargs rm -f
find . -name __pycache__ | xargs rm -fr
tree .
import sys, ps
def process(n):
sys.stdout.write("[%s] #%s - %s" % (n.pid, n.channel, n.payload))
if __name__=='__main': ps.subs_loop("game", process)
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
subscriber of pub/sub pattern
Usage:
python -m ps.tap <channel> <channel2> ...
"""
from ps import tap
if __name__=='__main__': tap(*sys.argv[1:])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment