Skip to content

Instantly share code, notes, and snippets.

@MarkusH
Forked from quiver/README.md
Last active September 5, 2021 12:58
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save MarkusH/47a0904982b321a717cb to your computer and use it in GitHub Desktop.
Save MarkusH/47a0904982b321a717cb to your computer and use it in GitHub Desktop.
Python PostgreSQL PubSub

Pub/Sub pattern with PostgreSQL's LISTEN/NOTIFY command

This is a simple chat-like program using pub-sub pattern, backed by PostgreSQL's LISTEN/NOTIFY command.

Publish

publish message to foo channel from user nickname.

$ python pub.py foo nickname
PUBLISH to channel #foo
test message
what's up?
<Ctrl-D>
$

publish message to bar channel from user mike.

$ python pub.py bar mike
PUBLISH to channel #bar
pub/sub rules!
<Ctrl-D>
$

Subscribe

subscribe to foo or bar channel.

$ python sub.py foo bar
SUBSCRIBE TO channel #foo
SUBSCRIBE TO channel #bar
#foo - [nickname]:test message
#foo - [nickname]:what's up?
#bar - [mike]:pub/sub rules!
<Ctrl-C>
UNSUBSCRIBE FROM channel #foo
UNSUBSCRIBE FROM channel #bar
$
# vim: set fileencoding=utf8
# publisher of pub/sub pattern
# Usage:
# $ python pub.py <channel> <nickname>
# http://initd.org/psycopg/docs/advanced.html#asynchronous-notifications
import sys
import psycopg2
import psycopg2.extensions
DSN = "dbname=dev user=jsmith"
conn = psycopg2.connect(DSN)
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
curs = conn.cursor()
channel = sys.argv[1]
nickname = sys.argv[2]
print "PUBLISH to channel #%s" % channel
while True:
try:
message = raw_input()
chat_message = psycopg2.extensions.QuotedString(
'[%s]:%s' % (nickname, message)).getquoted()
curs.execute("NOTIFY %s, %s;" % (channel, chat_message))
# If you want to use pg_notify function
#curs.execute("SELECT pg_notify('%s', %s);" % (channel, chat_message))
except EOFError:
break
# vim: set fileencoding=utf8
# subscriber of pub/sub pattern
# Usage:
# $ python sub.py <channel> <channel> ...
# http://initd.org/psycopg/docs/advanced.html#asynchronous-notifications
import select
import sys
import psycopg2
import psycopg2.extensions
DSN = "dbname=dev user=jsmith"
conn = psycopg2.connect(DSN)
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
curs = conn.cursor()
for channel in sys.argv[1:]:
print 'SUBSCRIBE TO channel #%s' % channel
curs.execute("LISTEN %s;" % channel)
epoll = select.epoll()
epoll.register(conn, select.EPOLLIN)
while True:
try:
events = epoll.poll()
conn.poll()
while conn.notifies:
notify = conn.notifies.pop()
print "#%s - %s" % (notify.channel, notify.payload)
except BaseException, err:
print err
break
for channel in sys.argv[1:]:
print 'UNSUBSCRIBE FROM channel #%s' % channel
curs.execute("UNLISTEN %s;" % channel)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment