Skip to content

Instantly share code, notes, and snippets.

@dynajoe
Created March 26, 2020 02:49
Show Gist options
  • Save dynajoe/0ab486d60a1e4fd683ac01b2633339a2 to your computer and use it in GitHub Desktop.
Save dynajoe/0ab486d60a1e4fd683ac01b2633339a2 to your computer and use it in GitHub Desktop.

Problem

When scheduling actions to be carried out at a specific time there are a wealth of options. Each option suitable for a very different use case. The aim of this description is to present a use case with requirements and a solution to event scheduling.

Requirements

  1. Scheduled events must be persisted
  2. Events can be cancelled
  3. Events can be rescheduled
  4. Events are easily traceable
  5. Upon failure, events should be retried without user intervention.
  6. Multiple events for the same entity should be considered

The landscape

Many solutions involve a dedicated process that loads events from a datastore and set timers in process. There are implementations on various technology stacks. These solutions seem very efficient but don't cover all of the requirements proposed.

RabbitMQ has a concept of message TTL's but require a more sophisticated setup for proper persistence and or queue configuration. Further, the requirement of multiple events for the same entity isn't easily achieved as there's no way to select and cancel future redundant events.

Solutions

Postgres is the swiss army knife for organizations that need fairly complex functionality without needing to maintain many different technologies. It has functionality for nearly every data use case with varying degrees of efficiency. It is my opinion that you should use Postgres until it just doesn't work for you. If that's the case then your business has likely grown to a point that it's reasonable to maintain additional technologies.

Given the proposition that Postgres can be used to achieve nearly any data related task let's see if we can achieve our requirements.

Scheduled Events must be persisted.

This one is easy. Persistence for postgres is a no brainer. And to top it off replication is possible with minimal configuration.

Events can be cancelled

If we think of events as rows in a table then they can add a flag to mark the row as cancelled or even delete the row. Our organization rarely deletes data in order to allow us to easily reconstruct a timeline of events.

This gives us our first opportunity to declare a structure. Let's add some obvious columns to support a basic queue table.

CREATE TABLE schedule_queue (
   schedule_queue_item_id bigserial NOT NULL,
   enqueued_on timestamptz DEFAULT now() NOT NULL,
   enqueued_by text NOT NULL,
   dequeued_on timestamptz,
   dequeued_by text,
   schedule_on timestamptz NOT NULL,
   data jsonb NOT NULL
)

Cancellation can be achieved by updating dequeued_on to now().

Events can be rescheduled

Given our current schema this can be achieved by updating the schedule_on to a different timestamp. But we don't have a way of selecting what we're trying to reschedule. We could have kept track of the schedule_queue_item_id but that requires storage of that key in another place. It may be better to use a natural key. Natural keys can be any unique text that can be derived from non-volatile facts about the entity. If you identify users via e-mail address (assuming this doesn't change) you could use that along with the purpose of the queue item. For example: welcome_message:personemail@email.com this would identify a scheduled welcome message to that email address.

CREATE TABLE schedule_queue (
   schedule_queue_item_id bigserial NOT NULL,
   enqueued_on timestamptz DEFAULT now() NOT NULL,
   enqueued_by text NOT NULL,
   dequeued_on timestamptz,
   dequeued_by text,
   schedule_on timestamptz NOT NULL,
   data jsonb NOT NULL,
   item_key text NOT NULL
);

CREATE UNIQUE INDEX unique_schedule_queue_item_key_queued_on
   ON schedule_queue (item_key) WHERE dequeued_on IS NULL;

I've also thrown in here a unique index to prevent multiple items from being enqueued with the same key. The unique index also makes for efficient lookup.

Let's see what it might take to schedule and reschedule an event at this time.

INSERT INTO schedule_queue (
    enqueued_by,
    schedule_on,
    data,
    item_key
) VALUES (
    'engineering-blog',
    now() + '1 hour',
    '{ "message": "Welcome to the engineering blog" }',
    'welcome_message:personemail@email.com'
);

UPDATE schedule_queue 
SET schedule_on = now() + '2 hours'
WHERE item_key = 'welcome_message:personemail@email.com';

-- Whoops let's not send that afterall

UPDATE schedule_queue 
SET dequeued_on = now(),
    dequeued_by = 'engineering-blog'
WHERE item_key = 'welcome_message:personemail@email.com' 
    AND dequeued_on IS NULL;

Easy enough. We now have a persisted queue that allows easy lookup of items for rescheduling or cancellation.

Events are easily traceable

This is handled given the fact that we've set up several metadata columns such as enqueued_on, enqueued_by, dequeued_on, dequeued_by.

Upon failure, events should be retried without user intervention.

One thing we could do is not update dequeued on until the event is successfully processed. Failure may happen in our example if the email servers are have an intermittent failure. But this leads to a possibly infinite number of attempts. Let's add a column to keep track of how many times an event has been attempted.

ALTER TABLE schedule_queue
ADD COLUMN redeliver_count integer DEFAULT 0 NOT NULL;

Now we can increment this column when processing fails. Going a bit further, we could add metadata to describe the nature of the failure.

Observations

  1. You may have noticed there's no way to claim messages and could lead to multiple readers processing the same message.

  2. Okay, assuming you can claim messages how do you ensure that messages aren't claimed and never processed due to failure?

There are a number of things we can do to claim the message. Adding another timestamp column to mark when an item has been claimed and filtering those out when looking for work. This would require a concept of processing timeout. You could also do this with a simple query filter. For example:

ALTER TABLE schedule_queue
ADD COLUMN claimed_on timestamptz,
ADD COLUMN claimed_by text;

UPDATE schedule_queue  
SET claimed_on = now(),
    claimed_by = 'engineering-blog-worker-1'
WHERE item_key = 'welcome_message:personemail@email.com';

Now finding events to process might look something like this:

SELECT * 
FROM schedule_queue
WHERE dequeued_on IS NULL 
        AND (claimed_on IS NULL OR (now() - '5 minutes'::interval) > claimed_on)
    AND schedule_on <= now()
LIMIT 1;

** If you're not getting rows the item schedule_on likely hasn't elapsed **

This is nice but what if two workers claim the same message AT THE SAME TIME?

This never happens.

|> |> |> |> |>

Wrong.

Select for update skip locked

WITH selected_row as (
    SELECT schedule_queue_item_id
    FROM schedule_queue
    WHERE dequeued_on IS NULL 
        AND (claimed_on IS NULL OR (now() - '5 seconds'::interval) > claimed_on)
        AND schedule_on <= now()    
    FOR UPDATE SKIP LOCKED
    LIMIT 1
)
UPDATE schedule_queue 
SET claimed_on = now(), claimed_by = 'engineering-blog-worker-1'
WHERE schedule_queue_item_id IN (select schedule_queue_item_id FROM selected_row);

Now all that's left is something that periodically polls this table to pick up work. Another step might be listening for inserts and maintaining a timer to know when is most efficient to check the table again. We'll explore this in the addendum.

I want my worker to consider ALL events for the same entity.

Let's add a way to group items. Perhaps we want to evaluate all scheduled events for a given entity. Say all emails intended for a specific email address.

ALTER TABLE schedule_queue
ADD column group_key text;

And some test data

INSERT INTO schedule_queue (
    enqueued_by,
    schedule_on,
    data,
    item_key,
    group_key
) VALUES (
    'engineering-blog-team',
    now(),
    '{ "message": "Welcome to the engineering blog" }',
    'welcome_message:personemail@email.com',
    'personemail@email.com'
), (
    'engineering-blog-signup',
    now(),
    '{ "message": "To get started do these three simple steps!" }',
    'getting_started_instructions:personemail@email.com',
    'personemail@email.com'
);

Now, let's modify the query to use our group key.

WITH selected_row as (
    SELECT array_agg(schedule_queue_item_id)
    FROM schedule_queue
    WHERE dequeued_on IS NULL 
        AND (claimed_on IS NULL OR (now() - '5 seconds'::interval) > claimed_on)    
    GROUP BY group_key
    LIMIT 1
    FOR UPDATE SKIP LOCKED
)
UPDATE schedule_queue 
SET claimed_on = now(), claimed_by = 'engineering-blog-worker-1'
WHERE schedule_queue_item_id = ANY(select schedule_queue_item_id FROM selected_row);

Now run this query... go ahead... do it....

|>|>

ERROR: FOR UPDATE is not allowed with GROUP BY clause

Crap. Everything was working so well... now what? We can't remove that or we'll get in trouble with multiple readers. Since we can no longer use SELECT...FOR UPDATE because of the new requirement to evaluate all outstanding events we need to look elsewhere.

Postgres Advisory Locks

Advisory locks allow us to for obtain locks for application specific purposes arbitrary values.

CREATE OR REPLACE FUNCTION schedule_queue_claim()
RETURNS SETOF schedule_queue
AS $$
DECLARE
   v_cursor CURSOR FOR
      SELECT schedule_queue_item_id, group_key, item_key
      FROM schedule_queue
      WHERE dequeued_on IS NULL
         AND schedule_on <= now();
BEGIN
   FOR x IN v_cursor LOOP
      IF (pg_try_advisory_xact_lock(hashtext('schedule_queue'), hashtext(x.group_key))) THEN
         RETURN QUERY
         SELECT *
         FROM schedule_queue
         WHERE dequeued_on IS NULL
            AND schedule_on <= now()
            AND group_key = x.group_key;

         EXIT WHEN FOUND;
      END IF;
   END LOOP;
END;
$$ LANGUAGE plpgsql STABLE;

The key to this function is acquiring an advisory lock on the group key. This is accomplished by trying each elapsed event one by one. In order to make full use of this your calling function will need to establish a transaction that lives as long as you need to process the event. When pg_try_advisory_xact_lock returns true it acquires the requested lock and holds it until the end of the transaction. Postgres supports tens or hundreds of thousands of simultaneous advisory locks and, in general, advise using advisory locks over a claimed_on column.

BEGIN;

SELECT * 
FROM schedule_queue_claim();

-- IN A SEPARATE OR SAME TRANSACTION DO WORK.

-- Upon success
UPDATE common.schedule_queue
SET
   dequeued_on = now(),
   dequeued_by = 'engineering-blog-worker-1'
WHERE schedule_queue_item_id = ANY('{1,2,3,4}'::bigint[])
RETURNING *;

-- Upon failure (better in a separate transaction)

UPDATE common.schedule_queue
SET redeliver_count = redeliver_count + 1
WHERE schedule_queue_item_id = ANY('{1,2,3,4}'::bigint[]);

COMMIT;

Separate transaction or same?

Whether or not to use separate transaction for your database updates is up to your use case. The major consideration to make is whether or not your queries to the database have the potential to abort the transaction.

When failures happen you'll want to make sure that you have the ability to increment the redeliver count.

Open transaction A -> Claim message (A) -> Keep open
   Perform any work using Transaction A, send email, update records, etc.
COMMIT

Perform any non-retractible side effects.    
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment