Last active
August 27, 2018 21:27
-
-
Save bootrino/d621bfa4040f970e7be5f2472074b710 to your computer and use it in GitHub Desktop.
Incomplete stuff needed for asynch Postgres notification listen/notify
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# this would need to be thought through and edited to make it work properly. The bits are cut out of a larger working solution. | |
import psycopg2 | |
import psycopg2.extras | |
import aiopg | |
import asyncio | |
import json | |
from urllib.parse import quote | |
import atexit | |
from psycopg2.extensions import AsIs | |
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT | |
db_params = { | |
'database': 'dbname', | |
'user': 'dbusername', | |
'password': 'supersecret', | |
'host': '127.0.0.1', | |
'port': '5432', | |
} | |
conn_synch = None | |
cur_synch = None | |
def send_notification_postgres(channel, payload): | |
if not isinstance(payload, str): | |
raise Exception("payload must be a string") | |
# careful about incompatible quoting between javascript and python | |
# this needs to match the function on the javascript side | |
# https://stackoverflow.com/a/6618858/627492 | |
payload = quote(payload, safe='~()*!.\'') | |
# binding the channel name cause this problem: https://github.com/psycopg/psycopg2/issues/423 | |
print(cur_synch.mogrify("""NOTIFY %s, %s""", (AsIs(channel), payload, ))) | |
cur_synch.execute("""NOTIFY %s, %s""", (AsIs(channel), payload, )) | |
# {"topic" : "broadcasts_to_converters", "message_id" : "270"} | |
async def inbound_broadcast_listener_postgres(conn_asynch): | |
global act_on_listen_notifications | |
async with conn_asynch.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur_asynch: | |
print("listening to broadcasts_to_converters channel on Postgres....") | |
await cur_asynch.execute("LISTEN broadcasts_to_converters") | |
while True: | |
notify = await conn_asynch.notifies.get() | |
print("Got NOTIFY:", notify.pid, notify.channel, notify.payload) | |
if act_on_listen_notifications == True: | |
print('act_on_listen_notifications is True, starting processing') | |
# we need to ensure only one message at a time is acted on | |
act_on_listen_notifications = False | |
await get_a_render_job_and_process_it() | |
else: | |
print('act_on_listen_notifications is False, not acting on received notification') | |
async def main(): | |
# disconnect from postgres if program exits | |
atexit.register(postgres_disconnect) | |
postgres_connect() | |
async with aiopg.create_pool(**db_params) as pool: | |
async with pool.acquire() as conn_asynch: | |
listener = inbound_broadcast_listener_postgres(conn_asynch) | |
await asyncio.gather(listener) | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(main()) | |
loop.close() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment