Skip to content

Instantly share code, notes, and snippets.

@bootrino
Last active August 27, 2018 21:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bootrino/d621bfa4040f970e7be5f2472074b710 to your computer and use it in GitHub Desktop.
Save bootrino/d621bfa4040f970e7be5f2472074b710 to your computer and use it in GitHub Desktop.
Incomplete stuff needed for asynch Postgres notification listen/notify
# this would need to be thought through and edited to make it work properly. The bits are cut out of a larger working solution.
import psycopg2
import psycopg2.extras
import aiopg
import asyncio
import json
from urllib.parse import quote
import atexit
from psycopg2.extensions import AsIs
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
db_params = {
'database': 'dbname',
'user': 'dbusername',
'password': 'supersecret',
'host': '127.0.0.1',
'port': '5432',
}
conn_synch = None
cur_synch = None
def send_notification_postgres(channel, payload):
if not isinstance(payload, str):
raise Exception("payload must be a string")
# careful about incompatible quoting between javascript and python
# this needs to match the function on the javascript side
# https://stackoverflow.com/a/6618858/627492
payload = quote(payload, safe='~()*!.\'')
# binding the channel name cause this problem: https://github.com/psycopg/psycopg2/issues/423
print(cur_synch.mogrify("""NOTIFY %s, %s""", (AsIs(channel), payload, )))
cur_synch.execute("""NOTIFY %s, %s""", (AsIs(channel), payload, ))
# {"topic" : "broadcasts_to_converters", "message_id" : "270"}
async def inbound_broadcast_listener_postgres(conn_asynch):
global act_on_listen_notifications
async with conn_asynch.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur_asynch:
print("listening to broadcasts_to_converters channel on Postgres....")
await cur_asynch.execute("LISTEN broadcasts_to_converters")
while True:
notify = await conn_asynch.notifies.get()
print("Got NOTIFY:", notify.pid, notify.channel, notify.payload)
if act_on_listen_notifications == True:
print('act_on_listen_notifications is True, starting processing')
# we need to ensure only one message at a time is acted on
act_on_listen_notifications = False
await get_a_render_job_and_process_it()
else:
print('act_on_listen_notifications is False, not acting on received notification')
async def main():
# disconnect from postgres if program exits
atexit.register(postgres_disconnect)
postgres_connect()
async with aiopg.create_pool(**db_params) as pool:
async with pool.acquire() as conn_asynch:
listener = inbound_broadcast_listener_postgres(conn_asynch)
await asyncio.gather(listener)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment