Skip to content

Instantly share code, notes, and snippets.

@quiver

quiver/README.md

Last active Jun 14, 2021
Embed
What would you like to do?
Who says PostgreSQL can't Pub/Sub like Redis?

Pub/Sub pattern with PostgreSQL's LISTEN/NOTIFY command

This is a simple chat-like program using pub-sub pattern, backed by PostgreSQL's LISTEN/NOTIFY command.

Publish

publish message to foo channel from user nickname.

$ python pub.py foo nickname
PUBLISH to channel #foo
test message
what's up?
<Ctrl-D>
$

publish message to bar channel from user mike.

$ python pub.py bar mike
PUBLISH to channel #bar
pub/sub rules!
<Ctrl-D>
$

Subscribe

subscribe to foo or bar channel.

$ python sub.py foo bar
SUBSCRIBE TO channel #foo
SUBSCRIBE TO channel #bar
#foo - [nickname]:test message
#foo - [nickname]:what's up?
#bar - [mike]:pub/sub rules!
<Ctrl-C>
UNSUBSCRIBE FROM channel #foo
UNSUBSCRIBE FROM channel #bar
$
# vim: set fileencoding=utf8
# publisher of pub/sub pattern
# Usage:
# $ python pub.py <channel> <nickname>
# http://initd.org/psycopg/docs/advanced.html#asynchronous-notifications
import sys
import psycopg2
import psycopg2.extensions
DSN = "dbname=dev user=jsmith"
conn = psycopg2.connect(DSN)
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
curs = conn.cursor()
channel = sys.argv[1]
nickname = sys.argv[2]
print "PUBLISH to channel #%s" % channel
while True:
try:
message = raw_input()
chat_message = psycopg2.extensions.QuotedString(
'[%s]:%s' % (nickname, message)).getquoted()
curs.execute("NOTIFY %s, %s;" % (channel, chat_message))
# If you want to use pg_notify function
#curs.execute("SELECT pg_notify('%s', %s);" % (channel, chat_message))
except EOFError:
break
# vim: set fileencoding=utf8
# subscriber of pub/sub pattern
# Usage:
# $ python sub.py <channel> <channel> ...
# http://initd.org/psycopg/docs/advanced.html#asynchronous-notifications
import select
import sys
import psycopg2
import psycopg2.extensions
DSN = "dbname=dev user=jsmith"
conn = psycopg2.connect(DSN)
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
curs = conn.cursor()
for channel in sys.argv[1:]:
print 'SUBSCRIBE TO channel #%s' % channel
curs.execute("LISTEN %s;" % channel)
epoll = select.epoll()
epoll.register(conn, select.EPOLLIN)
while True:
try:
events = epoll.poll()
conn.poll()
while conn.notifies:
notify = conn.notifies.pop()
print "#%s - %s" % (notify.channel, notify.payload)
except BaseException, err:
print err
break
for channel in sys.argv[1:]:
print 'UNSUBSCRIBE FROM channel #%s' % channel
curs.execute("UNLISTEN %s;" % channel)
@shridharmanvi

This comment has been minimized.

Copy link

@shridharmanvi shridharmanvi commented Jul 6, 2017

Thanks for the example.

For mac OS users, epoll does not work. Here is an alternate way for consuming events: source

while 1:
        if select.select([conn],[],[],5) == ([],[],[]):
            print "Timeout"
        else:
            conn.poll()
            while conn.notifies:
                print 'Got notification!!'
                notify = conn.notifies.pop()
                print "Got NOTIFY:", notify.pid, notify.channel, notify.payload
@hazardland

This comment has been minimized.

Copy link

@hazardland hazardland commented Jan 27, 2020

Your code showed me a true path, it is outdated for python 3.6 already but there is a pgpubsub module now (which by itself uses select inside). I use this approach to deliver news events and notficiation to chat server clients via websocket from database (chat server handles its own clients and messages with websocket only):

import os
import pgpubsub

from dotenv import load_dotenv, find_dotenv
load_dotenv(find_dotenv())

import websocket
import json

server = websocket.WebSocket()
server.connect('ws://127.0.0.1:7000/event')

pubsub = pgpubsub.connect(os.getenv("DB"))
pubsub.listen('chat')

for event in pubsub.events(yield_timeouts=True):
    if event is None:
        # keep websocket connection alive
        print('ping')
        server.send('{"type":"ping"}')
    else:
        # send event payload to chat server via websocket
        print(event.payload)
        server.send(event.payload)
@chokosabe

This comment has been minimized.

Copy link

@chokosabe chokosabe commented Sep 3, 2020

Your code showed me a true path, it is outdated for python 3.6 already but there is a pgpubsub module now (which by itself uses select inside). I use this approach to deliver news events and notficiation to chat server clients via websocket from database (chat server handles its own clients and messages with websocket only):

import os
import pgpubsub

from dotenv import load_dotenv, find_dotenv
load_dotenv(find_dotenv())

import websocket
import json

server = websocket.WebSocket()
server.connect('ws://127.0.0.1:7000/event')

pubsub = pgpubsub.connect(os.getenv("DB"))
pubsub.listen('chat')

for event in pubsub.events(yield_timeouts=True):
    if event is None:
        # keep websocket connection alive
        print('ping')
        server.send('{"type":"ping"}')
    else:
        # send event payload to chat server via websocket
        print(event.payload)
        server.send(event.payload)

This is really nice! Thx.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment