Skip to content

Instantly share code, notes, and snippets.

@vadv
Created January 29, 2020 07:44
Show Gist options
  • Save vadv/72381760cfdccb7947b8beb135cdacd4 to your computer and use it in GitHub Desktop.
Save vadv/72381760cfdccb7947b8beb135cdacd4 to your computer and use it in GitHub Desktop.

Finite state machine in PostgreSQL

This document will not disclose the topic why you need fsm.

But if you implement fsm, then you need to do it as nearly as possible in the data.

This document will explain how you can implement it with PostgreSQL and gives developers maximum freedom.

Schema

Order

We create two normalized tables, in one the information about the order is stored in the other movement for each order.id.

/* list of order */
create table "order" (
    id              text primary key,
    created_at      timestamp with time zone default current_timestamp,
    processed_at    timestamp with time zone,
    processed_state text
);

/* processed transition on order.id */
create table order_events (
    id        bigserial,
    order_id  text references "order" (id),
    operation text not null,
    state     text not null,
    processed_at timestamp with time zone not null default current_timestamp
);

Transition

In the table order_event_transition we will store the allowed transition from one state to another:

/* list of allowed fsm transition */
create table order_event_transition (
    operation  text not null,
    from_state text not null,
    to_state   text not null,
    PRIMARY KEY (operation, from_state)
);

Hot table for queue

In the table order_pending we will store the transactions that need to be processed. The use of such a table is necessary to avoid bloating on the main tables.

/* hot table for unresolved order's and queue processing */
create table order_pending (
    id         bigserial primary key,
    order_id   text unique,
    created_at timestamp with time zone default current_timestamp,
    locked_at  timestamp with time zone
);

Lock order.id

Helper function to lock specific order.id to avoid a race condition.

create or replace function obtain_lock_order_id(id text) returns void as $$
begin
    /* lock order_id */
    perform pg_advisory_xact_lock(('x'||substr(md5($1),1,16))::bit(64)::bigint);
end
$$ language 'plpgsql';

Helper function that tries to lock specific order.id. It is used to get unprocessed orders. The only difference is that it doesn't wait for lock to be released and returns false if the lock has already been taken.

create or replace function try_obtain_lock_order_id(id text) returns bool as $$
begin
    return (select pg_try_advisory_xact_lock(('x'||substr(md5($1),1,16))::bit(64)::bigint));
end
$$ language 'plpgsql';

FSM consistency

This is a minimal example to check if a transaction change is allowed. It's works automatically as a constraint trigger:

begin;
create or replace function allowed_transition(order_id text) returns setof order_event_transition as $$
    select o.* from order_event_transition o where
        coalesce(o.from_state, '<null>') in (
            select coalesce(state, '<null>')
                from order_events e where e.order_id = $1 order by processed_at desc limit 2);
$$ language 'sql';

create or replace function order_events_check_valid_insert() returns trigger as $$
begin
    perform obtain_lock_order_id(new.order_id);
    -- check new state
    if new.state not in (select to_state from order_event_transition where operation = new.operation) then
        raise 'cant perform operation "%" to state "%"', new.operation, new.state using errcode = 'unique_violation';
    end if;
    -- check current state
    if not exists (select 1 from allowed_transition(new.order_id) where operation = new.operation and to_state = new.state) then
        raise 'cant perform operation "%" to current order state', new.operation using errcode = 'unique_violation';
    end if;
    return new;
end
$$ language 'plpgsql';

create constraint trigger fsm_order_events_check_valid_insert_trigger
    after insert on order_events for each row
    execute procedure order_events_check_valid_insert();

Usage example

Seed

Create allowed transitions:

insert into order_event_transition (operation, from_state, to_state) values ('pending', null,                'create');
insert into order_event_transition (operation, from_state, to_state) values ('start',   'create',            'awaiting_payment');
insert into order_event_transition (operation, from_state, to_state) values ('pay',     'awaiting_payment',  'awaiting_shipment');
insert into order_event_transition (operation, from_state, to_state) values ('ship',    'awaiting_shipment', 'shipped');
insert into order_event_transition (operation, from_state, to_state) values ('cancel',  'awaiting_shipment', 'awaiting_refund');
insert into order_event_transition (operation, from_state, to_state) values ('refund',  'awaiting_refund',   'canceled');

Pending

begin;
select obtain_lock_order_id('a929b8ce50234aa3ec428ed8db640622');
insert into "order" (id) values ('a929b8ce50234aa3ec428ed8db640622');
insert into order_events (order_id, operation, state) values ('a929b8ce50234aa3ec428ed8db640622', 'pending', 'create');
insert into order_pending (order_id) values ('a929b8ce50234aa3ec428ed8db640622');
commit;

Get unprocessed from queue

begin;
select
  order_id
from order_pending where locked_at is null and try_obtain_lock_order_id(order_id) limit 10;
-- load order_id to application
update order_pending set locked_at = current_timestamp where order_id in (...);
commit;

Start

begin;
select obtain_lock_order_id('a929b8ce50234aa3ec428ed8db640622');
insert into order_events (order_id, operation, state) values ('a929b8ce50234aa3ec428ed8db640622', 'start', 'awaiting_payment');
commit;

Pay

begin;
select obtain_lock_order_id('a929b8ce50234aa3ec428ed8db640622');
insert into order_events (order_id, operation, state) values ('a929b8ce50234aa3ec428ed8db640622', 'pay', 'awaiting_shipment');
commit;

Ship

begin;
select obtain_lock_order_id('a929b8ce50234aa3ec428ed8db640622');
insert into order_events (order_id, operation, state) values ('a929b8ce50234aa3ec428ed8db640622', 'ship', 'shipped');
update "order" set processed_state = 'shipped', processed_at = current_timestamp where id = 'a929b8ce50234aa3ec428ed8db640622';
delete from order_pending where order_id = 'a929b8ce50234aa3ec428ed8db640622';
commit;

History of order.id

/* get history */
begin;
select obtain_lock_order_id('a929b8ce50234aa3ec428ed8db640622');
select
    o.id,
    o.created_at,
    e.operation,
    e.state,
    e.processed_at
from
    "order" o
    inner join "order_events" e on e.order_id = o.id
where o.id = 'a929b8ce50234aa3ec428ed8db640622'
order by e.processed_at;
commit;

/*
                id                |          created_at           | operation |       state       |         processed_at
----------------------------------+-------------------------------+-----------+-------------------+-------------------------------
 a929b8ce50234aa3ec428ed8db640622 | 2019-04-09 16:46:13.730044+03 | pending   | create            | 2019-04-09 16:46:13.730044+03
 a929b8ce50234aa3ec428ed8db640622 | 2019-04-09 16:46:13.730044+03 | start     | awaiting_payment  | 2019-04-09 16:46:13.737338+03
 a929b8ce50234aa3ec428ed8db640622 | 2019-04-09 16:46:13.730044+03 | pay       | awaiting_shipment | 2019-04-09 16:46:13.738623+03
 a929b8ce50234aa3ec428ed8db640622 | 2019-04-09 16:46:13.730044+03 | ship      | shipped           | 2019-04-09 16:46:13.739792+03
*/
@al3rez
Copy link

al3rez commented Jan 30, 2020

Your first seed state doesn't work,

insert into order_event_transition (operation, from_state, to_state) values ('pending', null,                'create');

I guess null should be <null> or maybe the from_state type is wrong FYI.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment