Skip to content

Instantly share code, notes, and snippets.

@dtheodor
Last active August 7, 2023 11:38
Show Gist options
  • Star 31 You must be signed in to star a gist
  • Fork 4 You must be signed in to fork a gist
  • Save dtheodor/3862093af36a1aeb8104 to your computer and use it in GitHub Desktop.
Save dtheodor/3862093af36a1aeb8104 to your computer and use it in GitHub Desktop.
Listen for pg_notify with SQL Alchemy + Psycopg2
import select
import datetime
import psycopg2
import psycopg2.extensions
from sqlalchemy import create_engine, text
engine = create_engine("postgresql+psycopg2://vagrant@/postgres")
#conn = psycopg2.connect(database="postgres", user="vagrant")
#conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
#curs = conn.cursor()
#curs.execute("LISTEN test1;")
#curs.execute("LISTEN test2;")
conn = engine.connect()
conn.execute(text("LISTEN test1; LISTEN test2").execution_options(autocommit=True))
print "Waiting for notifications on channels 'test1', 'test2' with SQL Alchemy"
while 1:
#conn.commit()
if select.select([conn.connection],[],[],5) == ([],[],[]):
print "Timeout"
else:
conn.connection.poll()
while conn.connection.notifies:
notify = conn.connection.notifies.pop()
print "Got NOTIFY:", datetime.datetime.now(), notify.pid, notify.channel, notify.payload
@jn0
Copy link

jn0 commented Apr 14, 2021

It works.

But if one make use of pgbouncer, then it will fail.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment