Skip to content

Instantly share code, notes, and snippets.

@kissgyorgy
Created September 4, 2020 16:37
Show Gist options
  • Star 72 You must be signed in to star a gist
  • Fork 16 You must be signed in to fork a gist
  • Save kissgyorgy/beccba1291de962702ea9c237a900c79 to your computer and use it in GitHub Desktop.
Save kissgyorgy/beccba1291de962702ea9c237a900c79 to your computer and use it in GitHub Desktop.
How to use PostgreSQL's LISTEN/NOTIFY as a simple message queue with psycopg2 and asyncio
import asyncio
import psycopg2
# dbname should be the same for the notifying process
conn = psycopg2.connect(host="localhost", dbname="example", user="example", password="example")
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
cursor = conn.cursor()
cursor.execute(f"LISTEN match_updates;")
def handle_notify():
conn.poll()
for notify in conn.notifies:
print(notify.payload)
conn.notifies.clear()
# It works with uvloop too:
# import uvloop
# loop = uvloop.new_event_loop()
# asyncio.set_event_loop(loop)
loop = asyncio.get_event_loop()
loop.add_reader(conn, handle_notify)
loop.run_forever()
import time
import psycopg2
# dbname should be the same for the listening process
conn = psycopg2.connect(host="localhost", dbname="example", user="example", password="example")
cursor = conn.cursor()
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
while True:
val = time.time()
cursor.execute(f"NOTIFY match_updates, '{val}';")
time.sleep(1)
@anishamsul123
Copy link

can i know where did u connect with database?

@jonesnc
Copy link

jonesnc commented Dec 27, 2021

@anishamsul123 This is where they connect to the database:

conn = psycopg2.connect(host="localhost", dbname="example", user="example", password="example")

@kartikdc
Copy link

Anyone knows why conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) is important here?

Not doubting that it is important. Because when I took the line out, the program didn't work. Just asking out of curiosity.

@kissgyorgy
Copy link
Author

kissgyorgy commented Mar 31, 2022

@kartikdc It's a fairly long and complicated story.

psycopg2 is implemented according to PEP-249 (Python Database API Specification v2.0, "dbapi" for short), which states:

Note that if the database supports an auto-commit feature, this must be initially off.

So at every statement, a transaction is implicitly opened, but not committed, so whenever you want to finish the transaction, you have to explicitly COMMIT, unless you turn on "autocommit" mode (Django does this by default, deviating from PEP 249 ), which is essentially what is implemented here.

LISTEN and NOTIFY will run only after you commit the transaction:

Firstly, if a NOTIFY is executed inside a transaction, the notify events are not delivered until and unless the transaction is committed.

I implemented it this way, because

  • I found this behavior more intuitive, because you are more likely expect the message instantly appear on the other side and probably doesn't have other state modifying statements in the same daemon/process.
  • it's the same behavior as the Django default

You can change the scripts to manually commit instead of autocommit (it might be a valid use-case).

for listen.py:

--- listen-original.py  2022-03-31 21:12:46.164175561 +0200
+++ listen-new.py       2022-03-31 21:12:53.194317498 +0200
@@ -3,10 +3,10 @@

 # dbname should be the same for the notifying process
 conn = psycopg2.connect(host="localhost", dbname="example", user="example", password="example")
-conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)

 cursor = conn.cursor()
 cursor.execute(f"LISTEN match_updates;")
+conn.commit()

 def handle_notify():
     conn.poll()

for notify.py:

w--- notify-original.py  2022-03-31 21:10:50.661843844 +0200
+++ notify-new.py       2022-03-31 21:10:41.771664388 +0200
@@ -5,9 +5,9 @@
 conn = psycopg2.connect(host="localhost", dbname="example", user="example", password="example")

 cursor = conn.cursor()
-conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)

 while True:
     val = time.time()
     cursor.execute(f"NOTIFY match_updates, '{val}';")
+    conn.commit()
     time.sleep(1)

If you make these changes, you can turn off autocommit and it should behave the same way.

@kartikdc
Copy link

kartikdc commented Apr 1, 2022

@kissgyorgy Thanks very much for the detailed reply. You went above and beyond. Highly appreciated.

@rishigc
Copy link

rishigc commented Jul 31, 2022

@kissgyorgy this is absolutely fantastic. I'm trying to ATTACH PARTITIONS to a table on inserts by using the listener. So, if I run the statements in listen-new.py and then perform my required insert statements right after it, then how do i exit/terminate the listener (since in the script it uses loop.run_forever()). I intend to run the listener only while I am running the insert statements thru my Python program.

@kissgyorgy
Copy link
Author

@kissgyorgy this is absolutely fantastic. I'm trying to ATTACH PARTITIONS to a table on inserts by using the listener. So, if I run the statements in listen-new.py and then perform my required insert statements right after it, then how do i exit/terminate the listener (since in the script it uses loop.run_forever()). I intend to run the listener only while I am running the insert statements thru my Python program.

I don't understand exactly what you are trying to do, but maybe a TRIGGER would be better for your use-case? Triggers can be used for partitioned tables too:

Creating a row-level trigger on a partitioned table will cause an identical “clone” trigger to be created on each of its existing partitions; and any partitions created or attached later will have an identical trigger, too.

https://www.postgresql.org/docs/current/sql-createtrigger.html

@d33tah
Copy link

d33tah commented Sep 27, 2022

Isn't there an SQL injection in the NOTIFY part?

@kissgyorgy
Copy link
Author

Isn't there an SQL injection in the NOTIFY part?

  1. If you look at the definition of SQL injection at Wikipedia, in the second sentence it says "... when user input is either incorrectly filtered...". There is no user input in the snippet.
  2. This is an example of how to wire it up, people shouldn't copy-paste code from the internet verbatim.

@kartikdc
Copy link

@d33tah You can call the pg_notify function instead of the NOTIFY directive. You should be able to do parameter substitution with psycopg2 then.

@ubarar
Copy link

ubarar commented Oct 15, 2023

Thank you so much for the detailed reply!

@jamesbraza
Copy link

It would be quite useful @kissgyorgy if you have time to make a similar gist for psycopg version 3

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment