Skip to content

Instantly share code, notes, and snippets.

@karlrwjohnson
Created November 7, 2017 22:41
Show Gist options
  • Save karlrwjohnson/526c0061a9c7a8d47556c76794c3d273 to your computer and use it in GitHub Desktop.
Save karlrwjohnson/526c0061a9c7a8d47556c76794c3d273 to your computer and use it in GitHub Desktop.
PostgreSQL Message Queue Demo
'''
Postgres automatic message queue demo
Wherein a record-insertion trigger notifies a worker to process new table records
Requires:
- Python 3.6
- Postgres 9.6
- Psycopg2
Usage:
CONNECTION_ARGS="--host localhost --port 5432 --user $USER --password xxxxx'
message_queue_demo.py $CONNECTION_ARGS --install # Create requisite tables
message_queue_demo.py $CONNECTION_ARGS --worker # One or more workers
message_queue_demo.py $CONNECTION_ARGS --message "Message 1" "Message 2" ...
'''
import argparse
import getpass
import json
import psycopg2
import sys
import traceback
from contextlib import closing
from textwrap import dedent
parser = argparse.ArgumentParser();
parser.add_argument('--host', help='Postgres hostname', default='localhost')
parser.add_argument('--port', help='Postgres port', default=5432)
parser.add_argument('--user', help='Postgres username', default=getpass.getuser())
parser.add_argument('--password', help='Postgres password')
group = parser.add_mutually_exclusive_group();
group.add_argument('--install', action='count')
group.add_argument('--worker', action='count')
group.add_argument('--message', type=str, nargs='*')
args = parser.parse_args()
# Configuration
TABLE_NAME = f'my_event_queue'
CHANNEL_NAME = f'{TABLE_NAME}_channel'
PROC_NAME = f'notify_{CHANNEL_NAME}'
TRIGGER_NAME = f'trigger_{CHANNEL_NAME}'
SETUP_SCRIPT = dedent(f'''\
DROP TRIGGER IF EXISTS {TRIGGER_NAME} on {TABLE_NAME};
DROP FUNCTION IF EXISTS {PROC_NAME} ();
DROP TABLE IF EXISTS {TABLE_NAME};
CREATE TABLE {TABLE_NAME} (
id BIGSERIAL,
state TEXT DEFAULT 'PENDING',
error_detail TEXT,
type TEXT,
job JSONB
);
CREATE OR REPLACE FUNCTION {PROC_NAME} ()
RETURNS trigger
LANGUAGE plpgsql
AS
$$
BEGIN
NOTIFY {CHANNEL_NAME};
RETURN NEW;
END;
$$;
CREATE TRIGGER {TRIGGER_NAME}
AFTER INSERT
ON {TABLE_NAME}
FOR EACH STATEMENT
EXECUTE PROCEDURE {PROC_NAME}();
''')
def get_conn():
return psycopg2.connect(host=args.host, port=args.port, user=args.user, password=args.password)
def process_queue(cursor):
# Process event queue.
# Due to the isolation level we set earlier, we need to manually create a transaction
print(f'Processing queue')
cursor.execute('START TRANSACTION');
try:
# I should have used an ORM here.
# Also, the way this works, you can only really have a single worker
# going at a time -- each worker locks all the processable rows at time
# of the query. A second worker could *maybe* come along and grab any new
# events that are generated when the first is processing, but the balance
# could by highly asymmetrical. It might be better to use LIMIT in the SELECT
# clause to prevent a worker from claiming a bigger workload than it can
# handle.
cursor.execute(dedent(f'''\
SELECT id, state, type, job
FROM {TABLE_NAME}
WHERE state = %s AND type = %s
FOR UPDATE
SKIP LOCKED
;
'''), (
'PENDING',
'ECHO',
))
results = cursor.fetchall()
print(f'Processing {len(results)} items')
for result in results:
_id, _state, _type, _job = result
# Mark as "PROCESSING" so we know it was at least picked up
# should anything go wrong.
cursor.execute(dedent(f'''\
UPDATE {TABLE_NAME}
SET state = %s
WHERE id = %s
'''), (
'PROCESSING',
_id,
))
# Commit early and often so we remember which records have been processed.
cursor.execute('COMMIT')
try:
message = _job['message']
assert 'cause error' not in message
print(f'Message = \033[1;33m{message}\033[0m')
except Exception:
# Need to do something different for
exception = traceback.format_exc()
cursor.execute(dedent(f'''\
UPDATE {TABLE_NAME}
SET state = %s, error_detail = %s
WHERE id = %s
'''), (
'ERROR',
exception,
_id,
))
cursor.execute('COMMIT')
else:
cursor.execute(dedent(f'''\
UPDATE {TABLE_NAME}
SET state = %s
WHERE id = %s
'''), (
'PROCESSED',
_id,
))
cursor.execute('COMMIT')
finally:
cursor.execute('ROLLBACK')
######
if args.install:
with closing(get_conn()) as conn:
with closing(conn.cursor()) as cursor:
cursor.execute(SETUP_SCRIPT)
conn.commit()
elif args.worker:
with closing(get_conn()) as conn:
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
with closing(conn.cursor()) as cursor:
process_queue(cursor)
print(f'Waiting for notifications on channel {CHANNEL_NAME!r}')
cursor.execute(f'LISTEN {CHANNEL_NAME};')
import select # package I've never heard of -- it's a UNIX system call wrapper around select(),
# i.e. listen for port traffic
from psycopg2.extensions import Notify # Doesn't need to be imported; Just importing for annotation's sake
while True:
# Tell the kernel to sleep until the connection receives data (I guess)
# ARBITRARY_TIMEOUT_FROM_EXAMPLE = 5
ARBITRARY_TIMEOUT_FROM_EXAMPLE = None
if select.select([conn], [], [], ARBITRARY_TIMEOUT_FROM_EXAMPLE) == ([], [], []):
# print(f'Timeout after {ARBITRARY_TIMEOUT_FROM_EXAMPLE} seconds (?)')
pass
else:
print(f'Polling')
conn.poll()
while conn.notifies:
notify: Notify = conn.notifies.pop(0)
print(f'Got NOTIFY: channel={notify.channel!r}, payload={notify.payload!r}, pid={notify.pid!r}')
process_queue(cursor)
elif args.message:
with closing(get_conn()) as conn:
with closing(conn.cursor()) as cursor:
for message in args.message:
print(f"Inserting {message} into {TABLE_NAME}")
cursor.execute(dedent(f'''\
INSERT INTO {TABLE_NAME} (type, job)
VALUES (%s, %s)
'''), (
'ECHO',
json.dumps({"message": message}),
))
conn.commit()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment