Skip to content

Instantly share code, notes, and snippets.

@marcocitus
Last active December 9, 2020 05:21
Show Gist options
  • Star 8 You must be signed in to star a gist
  • Fork 4 You must be signed in to fork a gist
  • Save marcocitus/cd3e52ea897de2200f103adb6b511197 to your computer and use it in GitHub Desktop.
Save marcocitus/cd3e52ea897de2200f103adb6b511197 to your computer and use it in GitHub Desktop.
Prototype for PubSub on PG 10 with Citus 7
/* commands to run on the coordinator */
CREATE EXTENSION citus;
SELECT master_add_node('10.0.0.2', 5432);
SELECT master_add_node('10.0.0.3', 5432);
SELECT start_metadata_sync_to_node(nodename, nodeport) FROM pg_dist_node;
SET citus.replication_model TO 'streaming'
CREATE TABLE events (
event_id bigserial primary key,
ingest_time timestamptz default now(),
topic_name text not null,
payload jsonb
);
SELECT create_distributed_table('events', 'event_id');
/* commands to run on the data nodes */
CREATE EXTENSION IF NOT EXISTS citus;
CREATE EXTENSION IF NOT EXISTS dblink;
/*** LEASES ***/
/* shard leases on the local node */
CREATE TABLE leases (
consumer_group text not null,
shard_id bigint not null,
owner text,
new_owner text,
last_heartbeat timestamptz,
PRIMARY KEY (consumer_group, shard_id)
);
/* get leases within a consumer group on the given node */
CREATE OR REPLACE FUNCTION remote_get_leases(
node_name text,
node_port int,
group_name text)
RETURNS SETOF leases LANGUAGE plpgsql AS $function$
BEGIN
RETURN QUERY
SELECT (res).*
FROM dblink(format('host=%s port=%s dbname=%s', node_name, node_port, current_database()),
format('SELECT leases FROM leases WHERE consumer_group = %L', group_name))
AS r(res leases);
EXCEPTION WHEN others THEN
RAISE WARNING '%:% %', node_name, node_port, SQLERRM;
END;
$function$;
/* claim a shard on a remote node */
CREATE OR REPLACE FUNCTION claim_lease(
group_name text,
source_node text,
claimed_shard bigint)
RETURNS bool LANGUAGE plpgsql AS $function$
DECLARE
shard_slot text := format('%s_%s', group_name, claimed_shard);
BEGIN
/* create a replication slot before any writes occur */
IF NOT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = shard_slot) THEN
PERFORM pg_create_logical_replication_slot(shard_slot, 'test_decoding');
END IF;
INSERT INTO leases (consumer_group, shard_id, owner, last_heartbeat)
VALUES (group_name, claimed_shard, source_node, now())
ON CONFLICT (consumer_group, shard_id) DO UPDATE
SET new_owner = source_node, last_heartbeat = now()
WHERE leases.new_owner IS NULL;
RETURN true;
END;
$function$;
/* claim a shard on a remote node */
CREATE OR REPLACE FUNCTION remote_claim_lease(
node_name text,
node_port int,
group_name text,
source_node text,
shard_id bigint)
RETURNS SETOF bool LANGUAGE plpgsql AS $function$
BEGIN
RETURN QUERY
SELECT res
FROM dblink(format('host=%s port=%s dbname=%s', node_name, node_port, current_database()),
format('SELECT claim_lease(%L,%L,%s)', group_name, source_node, shard_id))
AS r(res bool);
EXCEPTION WHEN others THEN
RAISE WARNING '%:% %', node_name, node_port, SQLERRM;
END;
$function$;
/* distributed algorithm for dividing leases among consumers */
CREATE OR REPLACE FUNCTION obtain_leases(
group_name text,
source_node text,
OUT node_name text,
OUT node_port int,
OUT shard_id bigint)
RETURNS SETOF record LANGUAGE sql AS $function$
/* find the current owners of all shards */
WITH shard_owners AS (
SELECT n.nodename, n.nodeport, shardid, owner, new_owner
FROM pg_dist_node n
JOIN pg_dist_placement p USING (groupid)
JOIN pg_dist_shard s USING (shardid)
LEFT JOIN remote_get_leases(nodename, nodeport, group_name) l ON (shardid = l.shard_id)
WHERE logicalrelid = 'events'::regclass AND noderole = 'primary'
ORDER BY random()
),
/* claim all unclaimed shards */
claimed_shards AS (
SELECT nodename, nodeport, shardid, remote_claim_lease(nodename, nodeport, group_name, source_node, shardid)
FROM shard_owners
WHERE owner IS NULL
),
/* determine total number of consumers */
consumer_count AS (
SELECT count(DISTINCT node)
FROM (SELECT owner AS node FROM shard_owners WHERE owner IS NOT NULL
UNION ALL
SELECT new_owner AS node FROM shard_owners WHERE new_owner IS NOT NULL
UNION ALL
SELECT source_node) all_consumers
),
/* determine minimum number of shards to claim */
min_claimable_count AS (
SELECT count(*) / (SELECT * FROM consumer_count)
FROM pg_dist_shard
WHERE logicalrelid = 'events'::regclass
),
/* determine number of slots to claim (#shards/#consumers - #claimed - #held) */
num_shards_remaining AS (
SELECT
(SELECT * FROM min_claimable_count) -
(SELECT count(*) FROM claimed_shards) -
count(*) AS num_shards_remaining
FROM shard_owners
WHERE owner = source_node OR new_owner = source_node
),
/* find stealable shards (randomly ordered) */
stealable_shards AS (
SELECT row_number() OVER () AS row_number, nodename, nodeport, shardid
FROM shard_owners
WHERE owner IS NOT NULL AND owner != source_node AND (new_owner IS NULL OR new_owner != source_node)
),
/* steal shards */
stolen_shards AS (
SELECT *, remote_claim_lease(nodename, nodeport, group_name, source_node, shardid)
FROM stealable_shards
WHERE row_number <= (SELECT * FROM num_shards_remaining)
)
SELECT nodename, nodeport, shardid FROM claimed_shards
UNION
SELECT nodename, nodeport, shardid FROM stolen_shards
UNION
SELECT nodename, nodeport, shardid FROM shard_owners
WHERE owner = source_node OR new_owner = source_node;
$function$;
/*** EVENT PARSING (replace with PL/Python?) ***/
/* parse the schema name, table name, and tuple data from a test_decoding entry */
CREATE OR REPLACE FUNCTION parse_table_message(
message text,
OUT schema_name text,
OUT table_name text,
OUT command text,
OUT tuple_data text)
RETURNS record LANGUAGE sql AS $function$
SELECT m[1], m[3], m[5], m[6] FROM regexp_matches(message, 'table ([a-z_][a-z0-9_$]*|"(""|[^"])*")\.([a-z_][a-z0-9_$]*|"(""|[^"])*"): ([A-Z]+): (.*)') m;
$function$;
/* parse the column names, types and quoted values from a test_decoding tuple */
CREATE OR REPLACE FUNCTION parse_tuple_data(
tuple_data text,
OUT column_name text,
OUT column_type text,
OUT value text)
RETURNS SETOF record LANGUAGE sql AS $function$
SELECT m[1], m[3], m[4] FROM regexp_matches(tuple_data, $$([a-z_][a-z0-9_$]*|"(""|[^"])*")\[([a-z0-9 ]+)\]:('(''|[^'])*'|[^ ]*)$$, 'g') m;
$function$;
/* convert a test_decoding tuple to a serialised record */
CREATE OR REPLACE FUNCTION tuple_data_to_record(
tuple_data text)
RETURNS text LANGUAGE plpgsql AS $function$
DECLARE
select_query text;
result record;
BEGIN
SELECT 'SELECT '|| string_agg(value||'::'||column_type||' AS '||column_name, ', ') INTO select_query
FROM parse_tuple_data(tuple_data);
EXECUTE select_query INTO result;
RETURN result;
END;
$function$;
/*** EVENT POLLING ***/
/* poll for events within a particular shard */
CREATE OR REPLACE FUNCTION poll_events(
group_name text,
source_node text,
consumed_shard bigint,
topic_subscription text default '%',
last_applied_event_id bigint default NULL,
OUT event_id bigint,
OUT ingest_time timestamptz,
OUT topic_name text,
OUT payload jsonb)
RETURNS SETOF record LANGUAGE sql AS $function$
WITH
/* determine current lease, relinguish it if claimed by another consumer*/
ownership AS (
UPDATE leases
SET owner = Coalesce(new_owner, owner), new_owner = NULL, last_heartbeat = now()
WHERE consumer_group = group_name AND shard_id = consumed_shard AND owner = source_node
RETURNING owner = source_node AS current_owner
),
/* peek at latest changes (use changes.location instead of changes.lsn in postgres 9.6) */
changes AS (
SELECT row_number() OVER () AS row_number, changes.lsn AS lsn, changes.data
FROM pg_logical_slot_peek_changes(group_name||'_'||consumed_shard, NULL, 5000, 'skip-empty-xacts', 'on') changes
),
/* extract records from changes */
records AS (
SELECT lsn, row_number, (parse_table_message(data)).*
FROM changes
WHERE data LIKE format('table public.events_%s: %%', consumed_shard)
),
/* extract new events from records */
inserts AS (
SELECT lsn, row_number, lead(row_number) OVER () AS next_row_number, (tuple_data_to_record(tuple_data)::events).*
FROM records
WHERE command = 'INSERT' AND table_name = 'events_'||consumed_shard
),
topic_inserts AS (
SELECT * FROM inserts WHERE topic_name LIKE topic_subscription
),
/* find the row number of the last applied insert, if it's the last one then use the last change row number */
last_applied_insert AS (
SELECT CASE WHEN next_row_number IS NULL THEN last_row_number ELSE row_number END AS row_number
FROM (SELECT max(row_number) AS last_row_number FROM changes) last_row_number LEFT JOIN topic_inserts ON (true)
WHERE last_applied_event_id IS NOT NULL AND (event_id IS NULL OR event_id = last_applied_event_id)
LIMIT 1
),
/* if applicable, skip all changes up to the last applied event */
applied AS (
SELECT CASE WHEN (SELECT current_owner FROM ownership, last_applied_insert)
THEN (SELECT count(*) FROM (
SELECT pg_logical_slot_get_changes(group_name||'_'||consumed_shard, NULL, row_number::int, 'skip-empty-xacts', 'on')
FROM last_applied_insert) applied_inserts) > 0
ELSE false
END
),
/* get only the unapplied changes */
unapplied_inserts AS (
SELECT event_id, ingest_time, topic_name, payload
FROM ownership, topic_inserts
WHERE row_number > Coalesce((SELECT row_number FROM last_applied_insert), 0)
AND current_owner LIMIT 100
)
SELECT unapplied_inserts.* FROM applied, unapplied_inserts;
$function$;
/* function to expire old leases */
CREATE OR REPLACE FUNCTION expire_leases()
RETURNS void LANGUAGE sql AS $function$
DELETE FROM leases
WHERE last_heartbeat < now() - interval '2 minutes' AND new_owner IS NULL;
UPDATE leases
SET owner = new_owner, new_owner = NULL, last_heartbeat = now()
WHERE last_heartbeat < now() - interval '2 minute' AND new_owner IS NOT NULL;
$function$;
/* commands to run on a consumer node */
CREATE EXTENSION IF NOT EXISTS dblink;
CREATE TABLE my_leases (
consumer_group text not null,
shard_id bigint not null,
node_name text not null,
node_port int not null,
last_consumed_event bigint,
PRIMARY KEY (consumer_group, shard_id)
);
CREATE TYPE events AS (
event_id bigint,
ingest_time timestamptz,
topic_name text,
payload jsonb
);
/* obtain event subscriptions */
CREATE OR REPLACE FUNCTION remote_obtain_leases(
group_name text,
source_node text,
pubsub_url text)
RETURNS bigint LANGUAGE plpgsql AS $function$
DECLARE
num_new_leases bigint;
BEGIN
DELETE FROM my_leases WHERE consumer_group = group_name;
WITH new_leases AS(
INSERT INTO my_leases
SELECT group_name, shard_id, node_name, node_port
FROM dblink(pubsub_url,
format('SELECT * FROM obtain_leases(%L,%L)', group_name, source_node))
AS res(node_name text, node_port int, shard_id bigint)
RETURNING *
)
SELECT count(*) INTO num_new_leases FROM new_leases;
RETURN num_new_leases;
END;
$function$;
CREATE OR REPLACE FUNCTION connection_name(
group_name text,
node_name text,
node_port int,
shard_id bigint)
RETURNS text LANGUAGE sql AS $function$
SELECT format('%s_%s_%s_%s', group_name, hashtext(node_name), node_port, shard_id)
$function$;
CREATE OR REPLACE FUNCTION connect_to_shards(
group_name text)
RETURNS bigint LANGUAGE sql AS $function$
WITH named_leases AS (
SELECT connection_name(consumer_group, node_name, node_port, shard_id) AS connection_name, *
FROM my_leases
WHERE consumer_group = group_name
),
open_connections AS (
SELECT connections AS connection_name FROM unnest(dblink_get_connections()) connections
),
closed_leases AS (
SELECT l.*
FROM named_leases l LEFT JOIN open_connections c ON (l.connection_name = c.connection_name)
WHERE c.connection_name IS NULL
)
SELECT count(*)
FROM closed_leases, dblink_connect(connection_name, format('host=%s port=%s dbname=%s', node_name, node_port, current_database()))
$function$;
/* consume events from a particular shard */
CREATE OR REPLACE FUNCTION consume_events(
group_name text,
source_node text,
consume_shard_id bigint,
last_consumed_event_id bigint,
node_name text,
node_port int)
RETURNS SETOF events LANGUAGE sql AS $function$
WITH events AS (
SELECT (e).*
FROM dblink_get_result(connection_name(group_name, node_name, node_port, consume_shard_id), false)
AS res(e events)
),
last_event AS (
SELECT last_value(event_id) OVER () AS event_id FROM events
),
update_lease_status AS (
UPDATE my_leases
SET last_consumed_event = last_event.event_id
FROM last_event
WHERE consumer_group = group_name AND shard_id = consume_shard_id
RETURNING *
)
SELECT events.* FROM events, update_lease_status;
$function$;
/* consume events across all nodes */
CREATE OR REPLACE FUNCTION consume_events(
group_name text,
source_node text)
RETURNS SETOF events LANGUAGE plpgsql AS $function$
BEGIN
PERFORM connect_to_shards(group_name);
PERFORM dblink_send_query(connection_name(group_name, node_name, node_port, shard_id),
format('SELECT e FROM poll_events(%L,%L,%s,''%%'',%L) e', group_name, source_node, shard_id, last_consumed_event))
FROM my_leases
WHERE consumer_group = group_name;
RETURN QUERY
SELECT events.*
FROM my_leases, consume_events(group_name, source_node, shard_id, last_consumed_event, node_name, node_port) events
WHERE consumer_group = group_name;
PERFORM dblink_get_result(connection_name(group_name, node_name, node_port, shard_id), false)
FROM my_leases
WHERE consumer_group = group_name;
END;
$function$;
@qwertyuu
Copy link

qwertyuu commented Dec 9, 2020

How many read/write per seconds can you do with such a messaging system?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment