Created
November 7, 2017 22:41
-
-
Save karlrwjohnson/526c0061a9c7a8d47556c76794c3d273 to your computer and use it in GitHub Desktop.
PostgreSQL Message Queue Demo
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
''' | |
Postgres automatic message queue demo | |
Wherein a record-insertion trigger notifies a worker to process new table records | |
Requires: | |
- Python 3.6 | |
- Postgres 9.6 | |
- Psycopg2 | |
Usage: | |
CONNECTION_ARGS="--host localhost --port 5432 --user $USER --password xxxxx' | |
message_queue_demo.py $CONNECTION_ARGS --install # Create requisite tables | |
message_queue_demo.py $CONNECTION_ARGS --worker # One or more workers | |
message_queue_demo.py $CONNECTION_ARGS --message "Message 1" "Message 2" ... | |
''' | |
import argparse | |
import getpass | |
import json | |
import psycopg2 | |
import sys | |
import traceback | |
from contextlib import closing | |
from textwrap import dedent | |
parser = argparse.ArgumentParser(); | |
parser.add_argument('--host', help='Postgres hostname', default='localhost') | |
parser.add_argument('--port', help='Postgres port', default=5432) | |
parser.add_argument('--user', help='Postgres username', default=getpass.getuser()) | |
parser.add_argument('--password', help='Postgres password') | |
group = parser.add_mutually_exclusive_group(); | |
group.add_argument('--install', action='count') | |
group.add_argument('--worker', action='count') | |
group.add_argument('--message', type=str, nargs='*') | |
args = parser.parse_args() | |
# Configuration | |
TABLE_NAME = f'my_event_queue' | |
CHANNEL_NAME = f'{TABLE_NAME}_channel' | |
PROC_NAME = f'notify_{CHANNEL_NAME}' | |
TRIGGER_NAME = f'trigger_{CHANNEL_NAME}' | |
SETUP_SCRIPT = dedent(f'''\ | |
DROP TRIGGER IF EXISTS {TRIGGER_NAME} on {TABLE_NAME}; | |
DROP FUNCTION IF EXISTS {PROC_NAME} (); | |
DROP TABLE IF EXISTS {TABLE_NAME}; | |
CREATE TABLE {TABLE_NAME} ( | |
id BIGSERIAL, | |
state TEXT DEFAULT 'PENDING', | |
error_detail TEXT, | |
type TEXT, | |
job JSONB | |
); | |
CREATE OR REPLACE FUNCTION {PROC_NAME} () | |
RETURNS trigger | |
LANGUAGE plpgsql | |
AS | |
$$ | |
BEGIN | |
NOTIFY {CHANNEL_NAME}; | |
RETURN NEW; | |
END; | |
$$; | |
CREATE TRIGGER {TRIGGER_NAME} | |
AFTER INSERT | |
ON {TABLE_NAME} | |
FOR EACH STATEMENT | |
EXECUTE PROCEDURE {PROC_NAME}(); | |
''') | |
def get_conn(): | |
return psycopg2.connect(host=args.host, port=args.port, user=args.user, password=args.password) | |
def process_queue(cursor): | |
# Process event queue. | |
# Due to the isolation level we set earlier, we need to manually create a transaction | |
print(f'Processing queue') | |
cursor.execute('START TRANSACTION'); | |
try: | |
# I should have used an ORM here. | |
# Also, the way this works, you can only really have a single worker | |
# going at a time -- each worker locks all the processable rows at time | |
# of the query. A second worker could *maybe* come along and grab any new | |
# events that are generated when the first is processing, but the balance | |
# could by highly asymmetrical. It might be better to use LIMIT in the SELECT | |
# clause to prevent a worker from claiming a bigger workload than it can | |
# handle. | |
cursor.execute(dedent(f'''\ | |
SELECT id, state, type, job | |
FROM {TABLE_NAME} | |
WHERE state = %s AND type = %s | |
FOR UPDATE | |
SKIP LOCKED | |
; | |
'''), ( | |
'PENDING', | |
'ECHO', | |
)) | |
results = cursor.fetchall() | |
print(f'Processing {len(results)} items') | |
for result in results: | |
_id, _state, _type, _job = result | |
# Mark as "PROCESSING" so we know it was at least picked up | |
# should anything go wrong. | |
cursor.execute(dedent(f'''\ | |
UPDATE {TABLE_NAME} | |
SET state = %s | |
WHERE id = %s | |
'''), ( | |
'PROCESSING', | |
_id, | |
)) | |
# Commit early and often so we remember which records have been processed. | |
cursor.execute('COMMIT') | |
try: | |
message = _job['message'] | |
assert 'cause error' not in message | |
print(f'Message = \033[1;33m{message}\033[0m') | |
except Exception: | |
# Need to do something different for | |
exception = traceback.format_exc() | |
cursor.execute(dedent(f'''\ | |
UPDATE {TABLE_NAME} | |
SET state = %s, error_detail = %s | |
WHERE id = %s | |
'''), ( | |
'ERROR', | |
exception, | |
_id, | |
)) | |
cursor.execute('COMMIT') | |
else: | |
cursor.execute(dedent(f'''\ | |
UPDATE {TABLE_NAME} | |
SET state = %s | |
WHERE id = %s | |
'''), ( | |
'PROCESSED', | |
_id, | |
)) | |
cursor.execute('COMMIT') | |
finally: | |
cursor.execute('ROLLBACK') | |
###### | |
if args.install: | |
with closing(get_conn()) as conn: | |
with closing(conn.cursor()) as cursor: | |
cursor.execute(SETUP_SCRIPT) | |
conn.commit() | |
elif args.worker: | |
with closing(get_conn()) as conn: | |
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) | |
with closing(conn.cursor()) as cursor: | |
process_queue(cursor) | |
print(f'Waiting for notifications on channel {CHANNEL_NAME!r}') | |
cursor.execute(f'LISTEN {CHANNEL_NAME};') | |
import select # package I've never heard of -- it's a UNIX system call wrapper around select(), | |
# i.e. listen for port traffic | |
from psycopg2.extensions import Notify # Doesn't need to be imported; Just importing for annotation's sake | |
while True: | |
# Tell the kernel to sleep until the connection receives data (I guess) | |
# ARBITRARY_TIMEOUT_FROM_EXAMPLE = 5 | |
ARBITRARY_TIMEOUT_FROM_EXAMPLE = None | |
if select.select([conn], [], [], ARBITRARY_TIMEOUT_FROM_EXAMPLE) == ([], [], []): | |
# print(f'Timeout after {ARBITRARY_TIMEOUT_FROM_EXAMPLE} seconds (?)') | |
pass | |
else: | |
print(f'Polling') | |
conn.poll() | |
while conn.notifies: | |
notify: Notify = conn.notifies.pop(0) | |
print(f'Got NOTIFY: channel={notify.channel!r}, payload={notify.payload!r}, pid={notify.pid!r}') | |
process_queue(cursor) | |
elif args.message: | |
with closing(get_conn()) as conn: | |
with closing(conn.cursor()) as cursor: | |
for message in args.message: | |
print(f"Inserting {message} into {TABLE_NAME}") | |
cursor.execute(dedent(f'''\ | |
INSERT INTO {TABLE_NAME} (type, job) | |
VALUES (%s, %s) | |
'''), ( | |
'ECHO', | |
json.dumps({"message": message}), | |
)) | |
conn.commit() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment