-
-
Save shuber/8e53d42d0de40e90edaf4fb182b59dfc to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- for everything message related | |
create schema if not exists api; | |
-- case-insensitive text and uuid columns | |
create extension if not exists citext with schema public; | |
create extension if not exists pgcrypto with schema public; | |
-- parent partition for all messages | |
create table if not exists api.messages ( | |
-- these ids will not conflict across services | |
id uuid not null default gen_random_uuid(), | |
-- specified when it's a reply to another message | |
reply uuid, | |
-- when was this message initially created | |
time timestamptz not null default now(), | |
-- the sending service's name | |
sender citext not null default current_database(), | |
-- the receiving service's name | |
receiver citext not null, | |
-- type of the message (services define these) | |
type citext not null, | |
-- message payload | |
data jsonb not null default '{}'::jsonb, | |
-- payloads must be hash objects | |
check (jsonb_typeof(data) = 'object'), | |
-- partition keys must be part of the primary key | |
primary key (receiver, id) | |
-- automatically write messages to service specific tables | |
) partition by list (receiver); | |
-- generate composite primary key for notifications | |
-- select m.*, m.pk from api.messages; | |
create function api.pk(api.messages) | |
returns text stable language sql as $$ | |
select $1.receiver || '-' || $1.id::text; | |
$$; | |
-- inbox/outbox table partitions for our "payments" and "store" services | |
-- note: there's only one inbox per database | |
create table if not exists api.payments partition of api.messages for values in ('payments'); | |
create table if not exists api.store partition of api.messages for values in ('store'); | |
-- create a store replication user in the payments database | |
create role store with replication login password 'store'; | |
grant all privileges on database payments to store; -- TODO: fine tune | |
grant all privileges on all tables in schema api to store; -- TODO: fine tune | |
-- create a payments replication user in the store database | |
create role payments with replication login password 'payments'; | |
grant all privileges on database store to payments; -- TODO: fine tune | |
grant all privileges on all tables in schema api to payments; -- TODO: fine tune | |
-- publish outgoing store messages in the payments database | |
create publication store for table api.store with (publish = 'delete, insert'); | |
-- publish outgoing payments messages in the store database | |
create publication payments for table api.payments with (publish = 'delete, insert'); | |
-- subscribe to payments messages in the store database | |
create subscription payments_to_store | |
connection 'host=payments port=5432 password=payments user=payments dbname=payments' | |
publication store; | |
-- subscribe to store messages in the payments database | |
create subscription store_to_payments | |
connection 'host=store port=5432 password=store user=store dbname=store' | |
publication payments; | |
-- message acknowledgement, notification, and cleanup | |
create or replace function api.ack() | |
returns trigger language plpgsql as $$ | |
begin | |
-- The service deleted a message they sent us from their outbox | |
if TG_OP = 'DELETE' then | |
if old.type != 'ack' then | |
-- Delete the associated ack message from our outbox | |
delete from api.messages | |
where reply = old.id | |
and type = 'ack' | |
and receiver = old.sender | |
and sender = old.receiver; | |
end if; | |
-- Don't delete the message from our inbox | |
return null; | |
-- We're receiving acknowedgement of a message we sent earlier | |
elsif new.type = 'ack' then | |
-- Delete the original message from our outbox | |
delete from api.messages | |
where id = new.reply | |
and receiver = new.sender | |
and sender = new.receiver; | |
-- Notify workers/threads waiting for acknowledgement of specific messages | |
perform pg_notify(new.receiver || '.ack.' || new.reply, new.pk); | |
-- Don't save this acknowledgement message | |
return null; | |
-- We're receiving a new message | |
else | |
-- Acknowledge receipt of the message | |
insert into api.messages (sender, receiver, type, reply) | |
values (new.receiver, new.sender, 'ack', new.id); | |
-- Notify workers/threads waiting for replies to specific messages | |
if new.reply is not null then | |
perform pg_notify(new.receiver || '.reply.' || new.reply, new.pk); | |
end if; | |
-- Notify workers/threads waiting for new messages | |
perform pg_notify(new.receiver::text, new.pk); | |
-- Deliver the message to our inbox | |
return new; | |
end if; | |
end; | |
$$; | |
-- enable trigger on the inbox table in the payments db | |
create trigger ack before delete or insert on api.payments | |
for each row execute procedure api.ack(); | |
-- enable trigger on the inbox table in the store db | |
create trigger ack before delete or insert on api.store | |
for each row execute procedure api.ack(); | |
-- since these aren't enabled by default for replication | |
alter table api.payments enable always trigger ack; | |
alter table api.store enable always trigger ack; | |
-- calculate delivery latency for testing below | |
-- select m.*, m.latency from api.messages m; | |
create function api.latency(api.messages) | |
returns interval stable language sql as $$ | |
select case when $1.received_at is not null then $1.received_at - $1.sent_at end; | |
$$; | |
-- let's send a message to the payments service from the store database | |
insert into api.messages(receiver, type, data) values ('payments', 'charge', '{"usd":100,"user":123}'::jsonb); | |
-- (payments db) select m.*, m.latency from api.messages; | |
id | reply | sent_at | received_at | sender | receiver | type | data | latency | |
--------------------------------------+-------+-------------------------------+-------------------------------+--------+----------+--------+------------------------+----------------- | |
30c60d61-886d-42d5-a5ff-0adca10d6eba | | 2020-05-13 04:11:28.341731+00 | 2020-05-13 04:11:28.347607+00 | store | payments | charge | {"usd":100,"user":123} | 00:00:00.005876 | |
-- (store db) select m.*, m.latency from api.messages; | |
id | reply | sent_at | received_at | sender | receiver | type | data | |
----+-------+---------+-------------+--------+----------+------+------ | |
(0 rows) |
Other discussions:
- https://old.reddit.com/r/PostgreSQL/comments/gkdp6p/logical_replication_for_async_service/
- https://www.postgresql.org/message-id/CAM8f5Mi1Ftj%2B48PZxN1AbM-P%3D4YMLENY5zRaPwTbmbkFwCsTkA%40mail.gmail.com
- https://dba.stackexchange.com/questions/267266/postgresql-logical-replication-for-async-service-communication
- https://news.ycombinator.com/item?id=23209221
simple suggestion;
create function api.pk(api.messages)
returns text immutable language sql as $$
select concat_ws('-', $1.receiver, $1.id::text);
$$;
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Visual workflow for "the store service sends a message to the payments service"
For clarity, the last segment of each step is used as the starting point for the following step.
Step 1 of 4 - send the first message
Step 2 of 4 - acknowledge receipt of the message
Step 3 of 4 - remove sent messages from outboxes
Step 4 of 4 - no-op the final replicated ack message deletion