Skip to content

Instantly share code, notes, and snippets.

@lotas
Last active September 14, 2023 16:22
Show Gist options
  • Save lotas/57779afe067e4f8eb586f37fe25016c9 to your computer and use it in GitHub Desktop.
Save lotas/57779afe067e4f8eb586f37fe25016c9 to your computer and use it in GitHub Desktop.
taskcluster queue migration
-- assist easier future migration, add readable columns to azure_queue_messages
alter table azure_queue_messages
add column task_queue_id text null, -- expose cryptic names
add column priority int null, -- don't forget about the priority
add column payload jsonb null; -- instead of base64 encoded string use jsonb
-- patch functions to use new columns
CREATE OR REPLACE FUNCTION public.azure_queue_put_extra(queue_name text, message_text text, visible timestamp without time zone, expires timestamp without time zone, task_queue_id text, priority int, payload jsonb)
RETURNS void
LANGUAGE plpgsql
AS $function$begin
insert into azure_queue_messages (
queue_name,
message_id,
message_text,
inserted,
visible,
expires,
task_queue_id,
priority,
payload
) values (
azure_queue_put_extra.queue_name,
public.gen_random_uuid(),
azure_queue_put_extra.message_text,
now(),
azure_queue_put_extra.visible,
azure_queue_put_extra.expires,
azure_queue_put_extra.task_queue_id,
azure_queue_put_extra.priority,
azure_queue_put_extra.payload
);
execute 'notify ' || quote_ident(queue_name);
end$function$
CREATE OR REPLACE FUNCTION public.azure_queue_update_extra(queue_name text, message_text text, message_id uuid, pop_receipt uuid, visible timestamp without time zone, payload jsonb)
RETURNS void
LANGUAGE plpgsql
AS $function$begin
update azure_queue_messages msgs
set message_text = azure_queue_update_extra.message_text,
visible = azure_queue_update_extra.visible,
payload = azure_queue_update_extra.payload
where msgs.queue_name = azure_queue_update_extra.queue_name
and msgs.message_id = azure_queue_update_extra.message_id
and msgs.pop_receipt = azure_queue_update_extra.pop_receipt;
end$function$
-- live migration strategy:
-- 0. locking azure_queue_messages for reading until data is migrated
-- 1. create table for resolved and deadline queues
-- 2. move data for resolved-queue, deadline-queue to the new tables
-- 3. create table for pending + claimed tasks
-- 4. move data for pending tasks to the new table
-- ^^ `-{provisionerId hash}-{workerType hash}-{priority}` hashes would need to be pre-calculated to know who is who
-- hash is sha256 base32 [0..15] (first 16 chars)
-- hard to calculate in SQL, probably we'll have to take a snapshot before the migration and encode all possible values
-- 5. unlock and delete azure_queue_messages
-- background tasks would be using new tables, so once existsing records will be moved, they will pick those messages
-- queue api would be already using new table after deployment, so new data will go into those tables directly
begin
-- migration is only possible when all tasks were migrated to new column structure
IF EXISTS (
SELECT message_id FROM azure_queue_messages
WHERE queue_name NOT IN ('claim-queue', 'deadline-queue', 'resolved-queue')
AND task_queue_id IS NULL
LIMIT 1
) THEN
RAISE EXCEPTION 'Not possible to migrate, some records are still in old format';
END IF;
-- migration of data
-- prevent reads and writes from table, get exclusive access
LOCK TABLE azure_queue_messages;
-- Task Deadlines
-- purpose: know when particular task expires, so dependencies/task groups can be resolved
-- queries: what tasks are not scheduled for given task_group_id/scheduler_id
CREATE TABLE queue_task_deadlines (
task_id text not null,
task_group_id text not null,
scheduler_id text not null,
created_at timestamptz not null,
deadline_at timestamptz not null,
expires timestamptz not null,
pop_receipt uuid null
);
-- migrate data to deadline queue
INSERT INTO queue_task_deadlines
(task_group_id, task_id, scheduler_id, created_at, deadline_at, expires, pop_receipt)
SELECT
payload->>'taskGroupId', payload->>'taskId', payload->>'schedulerId', inserted, visible, expires, pop_receipt
FROM azure_queue_messages
WHERE queue_name = 'deadline-queue'
AND expires > now();
-- Resolved Tasks
-- queries: what tasks are not scheduled for given task_group_id/scheduler_id
-- purpose: not much since this will have short-lived data
CREATE TABLE queue_resolved_tasks (
task_group_id text not null,
task_id text not null,
scheduler_id text not null,
resolution text not null,
resolved_at timestamptz not null,
expires timestamptz not null,
pop_receipt uuid null
);
-- migrate data to resolved queue
INSERT INTO
queue_resolved_tasks (task_id, task_group_id, scheduler_id, resolution, resolved_at, expires, pop_receipt)
SELECT
payload->>'taskId', payload->>'taskGroupId', payload->>'schedulerId', payload->>'resolution', inserted, expires, pop_receipt
FROM azure_queue_messages
WHERE queue_name = 'resolved-queue'
AND expires > now();
-- Claimed Tasks
-- queries: what tasks are running and waiting to be reclaimed or resolved
CREATE TABLE queue_claimed_tasks (
task_id text not null,
run_id integer not null,
claimed_at timestamptz not null,
taken_until timestamptz not null,
expires timestamptz not null,
pop_receipt uuid null
);
-- migrate data to resolved queue
INSERT INTO
queue_claimed_tasks (task_id, run_id, claimed_at, taken_until, expires, pop_receipt)
SELECT
payload->>'taskId', CAST(payload->>'runId' AS INTEGER), inserted, CAST(payload->>'takenUntil' AS timestamp with time zone), expires, pop_receipt
FROM azure_queue_messages
WHERE queue_name = 'claim-queue'
AND expires > now();
-- Pending (scheduled)
-- purpose: keep the record of all tasks that were scheduled and claimed
-- queries:
-- * what tasks are pending for a given queue
-- * what tasks are claimed and still running (for claim expire purpose)
-- * find tasks that were claimed but not resolved (for claim expire purpose)
CREATE TABLE queue_pending_tasks (
task_queue_id text not null,
priority int not null,
task_id text not null,
run_id integer not null,
hint_id text not null, -- to know that task was claimed properly, stored inside tasks.runs[]
inserted_at timestamptz not null,
expires timestamptz not null,
pop_receipt uuid null
);
-- rest goes into queue_pending_tasks
INSERT INTO queue_pending_tasks
(task_queue_id, priority, task_id, run_id, hint_id, inserted_at, expires, pop_receipt)
SELECT
task_queue_id, priority, payload->>'taskId', CAST(payload->>'runId' AS INTEGER), payload->>'hintId', inserted, expires, pop_receipt
FROM azure_queue_messages
WHERE queue_name NOT IN ('claim-queue', 'deadline-queue', 'resolved-queue')
AND task_queue_id IS NOT NULL
AND expires > now();
GRANT select, insert, update, delete ON queue_pending_tasks to $db_user_prefix$_queue;
GRANT select, insert, update, delete ON queue_task_deadlines to $db_user_prefix$_queue;
GRANT select, insert, update, delete ON queue_resolved_tasks to $db_user_prefix$_queue;
end
-- indexes tbd..
-- alter table queue_pending_tasks add primary key (task_queue_id, task_id);
grant select, insert, update, delete on queue_pending_tasks to $db_user_prefix$_queue;
grant select, insert, update, delete on queue_task_deadlines to $db_user_prefix$_queue;
grant select, insert, update, delete on queue_resolved_tasks to $db_user_prefix$_queue;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment