Skip to content

Instantly share code, notes, and snippets.

@shuber
Last active December 8, 2021 14:51
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save shuber/8e53d42d0de40e90edaf4fb182b59dfc to your computer and use it in GitHub Desktop.
Save shuber/8e53d42d0de40e90edaf4fb182b59dfc to your computer and use it in GitHub Desktop.
-- for everything message related
create schema if not exists api;
-- case-insensitive text and uuid columns
create extension if not exists citext with schema public;
create extension if not exists pgcrypto with schema public;
-- parent partition for all messages
create table if not exists api.messages (
-- these ids will not conflict across services
id uuid not null default gen_random_uuid(),
-- specified when it's a reply to another message
reply uuid,
-- when was this message initially created
time timestamptz not null default now(),
-- the sending service's name
sender citext not null default current_database(),
-- the receiving service's name
receiver citext not null,
-- type of the message (services define these)
type citext not null,
-- message payload
data jsonb not null default '{}'::jsonb,
-- payloads must be hash objects
check (jsonb_typeof(data) = 'object'),
-- partition keys must be part of the primary key
primary key (receiver, id)
-- automatically write messages to service specific tables
) partition by list (receiver);
-- generate composite primary key for notifications
-- select m.*, m.pk from api.messages;
create function api.pk(api.messages)
returns text stable language sql as $$
select $1.receiver || '-' || $1.id::text;
$$;
-- inbox/outbox table partitions for our "payments" and "store" services
-- note: there's only one inbox per database
create table if not exists api.payments partition of api.messages for values in ('payments');
create table if not exists api.store partition of api.messages for values in ('store');
-- create a store replication user in the payments database
create role store with replication login password 'store';
grant all privileges on database payments to store; -- TODO: fine tune
grant all privileges on all tables in schema api to store; -- TODO: fine tune
-- create a payments replication user in the store database
create role payments with replication login password 'payments';
grant all privileges on database store to payments; -- TODO: fine tune
grant all privileges on all tables in schema api to payments; -- TODO: fine tune
-- publish outgoing store messages in the payments database
create publication store for table api.store with (publish = 'delete, insert');
-- publish outgoing payments messages in the store database
create publication payments for table api.payments with (publish = 'delete, insert');
-- subscribe to payments messages in the store database
create subscription payments_to_store
connection 'host=payments port=5432 password=payments user=payments dbname=payments'
publication store;
-- subscribe to store messages in the payments database
create subscription store_to_payments
connection 'host=store port=5432 password=store user=store dbname=store'
publication payments;
-- message acknowledgement, notification, and cleanup
create or replace function api.ack()
returns trigger language plpgsql as $$
begin
-- The service deleted a message they sent us from their outbox
if TG_OP = 'DELETE' then
if old.type != 'ack' then
-- Delete the associated ack message from our outbox
delete from api.messages
where reply = old.id
and type = 'ack'
and receiver = old.sender
and sender = old.receiver;
end if;
-- Don't delete the message from our inbox
return null;
-- We're receiving acknowedgement of a message we sent earlier
elsif new.type = 'ack' then
-- Delete the original message from our outbox
delete from api.messages
where id = new.reply
and receiver = new.sender
and sender = new.receiver;
-- Notify workers/threads waiting for acknowledgement of specific messages
perform pg_notify(new.receiver || '.ack.' || new.reply, new.pk);
-- Don't save this acknowledgement message
return null;
-- We're receiving a new message
else
-- Acknowledge receipt of the message
insert into api.messages (sender, receiver, type, reply)
values (new.receiver, new.sender, 'ack', new.id);
-- Notify workers/threads waiting for replies to specific messages
if new.reply is not null then
perform pg_notify(new.receiver || '.reply.' || new.reply, new.pk);
end if;
-- Notify workers/threads waiting for new messages
perform pg_notify(new.receiver::text, new.pk);
-- Deliver the message to our inbox
return new;
end if;
end;
$$;
-- enable trigger on the inbox table in the payments db
create trigger ack before delete or insert on api.payments
for each row execute procedure api.ack();
-- enable trigger on the inbox table in the store db
create trigger ack before delete or insert on api.store
for each row execute procedure api.ack();
-- since these aren't enabled by default for replication
alter table api.payments enable always trigger ack;
alter table api.store enable always trigger ack;
-- calculate delivery latency for testing below
-- select m.*, m.latency from api.messages m;
create function api.latency(api.messages)
returns interval stable language sql as $$
select case when $1.received_at is not null then $1.received_at - $1.sent_at end;
$$;
-- let's send a message to the payments service from the store database
insert into api.messages(receiver, type, data) values ('payments', 'charge', '{"usd":100,"user":123}'::jsonb);
-- (payments db) select m.*, m.latency from api.messages;
id | reply | sent_at | received_at | sender | receiver | type | data | latency
--------------------------------------+-------+-------------------------------+-------------------------------+--------+----------+--------+------------------------+-----------------
30c60d61-886d-42d5-a5ff-0adca10d6eba | | 2020-05-13 04:11:28.341731+00 | 2020-05-13 04:11:28.347607+00 | store | payments | charge | {"usd":100,"user":123} | 00:00:00.005876
-- (store db) select m.*, m.latency from api.messages;
id | reply | sent_at | received_at | sender | receiver | type | data
----+-------+---------+-------------+--------+----------+------+------
(0 rows)
@shuber
Copy link
Author

shuber commented May 15, 2020

Visual workflow for "the store service sends a message to the payments service"

For clarity, the last segment of each step is used as the starting point for the following step.

Step 1 of 4 - send the first message

step_1

Step 2 of 4 - acknowledge receipt of the message

step_2

Step 3 of 4 - remove sent messages from outboxes

step_3

Step 4 of 4 - no-op the final replicated ack message deletion

step_4

@sahapasci
Copy link

simple suggestion;
create function api.pk(api.messages)
returns text immutable language sql as $$
select concat_ws('-', $1.receiver, $1.id::text);
$$;

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment