Skip to content

Instantly share code, notes, and snippets.

@vadv
Last active January 29, 2020 08:58
Show Gist options
  • Save vadv/294fb98b76c44aba800f4065bb00e79b to your computer and use it in GitHub Desktop.
Save vadv/294fb98b76c44aba800f4065bb00e79b to your computer and use it in GitHub Desktop.
/* list of order */
create table "order" (
id text primary key,
created_at timestamp with time zone default current_timestamp,
processed_at timestamp with time zone,
processed_state text
);
/* processed transition on order.id */
create table order_events (
id bigserial,
order_id text references "order" (id),
operation text not null,
state text not null,
processed_at timestamp with time zone not null default current_timestamp
)
/*
partition by range (processed_at);
create index order_events_order_id_idx on order_events(order_id);
create table order_events_2019_04 partition of order_events for values from ('2019-04-01') to ('2019-04-30');
*/;
/* list of allowed fsm transition */
create table order_event_transition (
operation text not null,
from_state text not null,
to_state text not null,
PRIMARY KEY (operation, from_state)
);
/* hot table for unresolved order's and queue processing */
create table order_pending (
id bigserial primary key,
order_id text unique,
created_at timestamp with time zone default current_timestamp,
locked_at timestamp with time zone
);
create or replace function try_obtain_lock_order_id(id text) returns void as $$
begin
/* try to lock order_id */
perform pg_try_advisory_xact_lock(('x'||substr(md5($1),1,16))::bit(64)::bigint);
end
$$ language 'plpgsql';
create or replace function order_operation_for(order_id text) returns text as $$
/*
get allowed operations for specified order_id
*/
with current_state as (
select state from order_events e where e.order_id = $1 order by processed_at limit 1
)
select
o.operation
from
order_event_transition o, current_state c on c.state = o.from_state
$$ language 'sql';
create or replace function order_events_check_valid_insert() returns trigger as $$
begin
perform try_obtain_lock_order_id(new.order_id);
-- check new state
if new.state not in (select to_state from order_event_transition where operation = new.state) then
raise 'cant perform operation "%" to state "%"', new.operation, new.state using errcode = 'unique_violation';
end if;
-- check current state
if new.operation not in (select * from order_operation_for(new.order_id)) then
raise 'cant perform operation "%" to current order state', new.operation using errcode = 'unique_violation';
end if;
return new;
end
$$ language 'plpgsql';
create constraint trigger fsm_order_events_check_valid_insert_trigger
after insert on order_events for each row
execute procedure order_events_check_valid_insert();
insert into order_event_transition (operation, from_state, to_state) values ('pending', 'create', 'create');
insert into order_event_transition (operation, from_state, to_state) values ('start', 'create', 'awaiting_payment');
insert into order_event_transition (operation, from_state, to_state) values ('pay', 'awaiting_payment', 'awaiting_shipment');
insert into order_event_transition (operation, from_state, to_state) values ('ship', 'awaiting_shipment', 'shipped');
insert into order_event_transition (operation, from_state, to_state) values ('cancel', 'awaiting_shipment', 'awaiting_refund');
insert into order_event_transition (operation, from_state, to_state) values ('refund', 'awaiting_refund', 'canceled');
/* pending */
begin;
select try_obtain_lock_order_id('a929b8ce50234aa3ec428ed8db640622');
insert into "order" (id) values ('a929b8ce50234aa3ec428ed8db640622');
insert into order_events (order_id, operation, state) values ('a929b8ce50234aa3ec428ed8db640622', 'pending', 'create');
insert into order_pending (order_id) values ('a929b8ce50234aa3ec428ed8db640622');
commit;
/* get unprocessed from queue */
begin;
select
order_id, try_obtain_lock_order_id(order_id)
from order_pending where locked_at is null for update skip locked limit 10;
-- load order_id to application
update order_pending set locked_at = current_timestamp where order_id in (...);
commit;
/* start */
begin;
select try_obtain_lock_order_id('a929b8ce50234aa3ec428ed8db640622');
insert into order_events (order_id, operation, state) values ('a929b8ce50234aa3ec428ed8db640622', 'start', 'awaiting_payment');
commit;
/* pay */
begin;
select try_obtain_lock_order_id('a929b8ce50234aa3ec428ed8db640622');
insert into order_events (order_id, operation, state) values ('a929b8ce50234aa3ec428ed8db640622', 'pay', 'awaiting_shipment');
commit;
/* ship */
begin;
select try_obtain_lock_order_id('a929b8ce50234aa3ec428ed8db640622');
insert into order_events (order_id, operation, state) values ('a929b8ce50234aa3ec428ed8db640622', 'ship', 'shipped');
update "order" set processed_state = 'shipped', processed_at = current_timestamp where id = 'a929b8ce50234aa3ec428ed8db640622';
delete from order_pending where order_id = 'a929b8ce50234aa3ec428ed8db640622';
commit;
/* get history */
begin;
select try_obtain_lock_order_id('a929b8ce50234aa3ec428ed8db640622');
select
o.id,
o.created_at,
e.operation,
e.state,
e.processed_at
from
"order" o
inner join "order_events" e on e.order_id = o.id
where o.id = 'a929b8ce50234aa3ec428ed8db640622'
order by e.processed_at;
commit;
/*
id | created_at | operation | state | processed_at
----------------------------------+-------------------------------+-----------+-------------------+-------------------------------
a929b8ce50234aa3ec428ed8db640622 | 2019-04-09 16:46:13.730044+03 | pending | create | 2019-04-09 16:46:13.730044+03
a929b8ce50234aa3ec428ed8db640622 | 2019-04-09 16:46:13.730044+03 | start | awaiting_payment | 2019-04-09 16:46:13.737338+03
a929b8ce50234aa3ec428ed8db640622 | 2019-04-09 16:46:13.730044+03 | pay | awaiting_shipment | 2019-04-09 16:46:13.738623+03
a929b8ce50234aa3ec428ed8db640622 | 2019-04-09 16:46:13.730044+03 | ship | shipped | 2019-04-09 16:46:13.739792+03
*/
@al3rez
Copy link

al3rez commented Jan 29, 2020

create or replace function order_operation_for(order_id text) returns text as $$
    /*
        get allowed operations for specified order_id
    */
    with current_state as (
        select state from order_events e where e.order_id = $1 order by processed_at limit 1
    )
    select
        o.operation
    from
        order_event_transition o, current_state c on c.state = o.from_state
$$ language 'sql';

This doesn't work? I guess there's an issue with secondary SELECT due to aliases.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment