Last active
September 14, 2023 16:22
-
-
Save lotas/57779afe067e4f8eb586f37fe25016c9 to your computer and use it in GitHub Desktop.
taskcluster queue migration
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- assist easier future migration, add readable columns to azure_queue_messages | |
alter table azure_queue_messages | |
add column task_queue_id text null, -- expose cryptic names | |
add column priority int null, -- don't forget about the priority | |
add column payload jsonb null; -- instead of base64 encoded string use jsonb | |
-- patch functions to use new columns | |
CREATE OR REPLACE FUNCTION public.azure_queue_put_extra(queue_name text, message_text text, visible timestamp without time zone, expires timestamp without time zone, task_queue_id text, priority int, payload jsonb) | |
RETURNS void | |
LANGUAGE plpgsql | |
AS $function$begin | |
insert into azure_queue_messages ( | |
queue_name, | |
message_id, | |
message_text, | |
inserted, | |
visible, | |
expires, | |
task_queue_id, | |
priority, | |
payload | |
) values ( | |
azure_queue_put_extra.queue_name, | |
public.gen_random_uuid(), | |
azure_queue_put_extra.message_text, | |
now(), | |
azure_queue_put_extra.visible, | |
azure_queue_put_extra.expires, | |
azure_queue_put_extra.task_queue_id, | |
azure_queue_put_extra.priority, | |
azure_queue_put_extra.payload | |
); | |
execute 'notify ' || quote_ident(queue_name); | |
end$function$ | |
CREATE OR REPLACE FUNCTION public.azure_queue_update_extra(queue_name text, message_text text, message_id uuid, pop_receipt uuid, visible timestamp without time zone, payload jsonb) | |
RETURNS void | |
LANGUAGE plpgsql | |
AS $function$begin | |
update azure_queue_messages msgs | |
set message_text = azure_queue_update_extra.message_text, | |
visible = azure_queue_update_extra.visible, | |
payload = azure_queue_update_extra.payload | |
where msgs.queue_name = azure_queue_update_extra.queue_name | |
and msgs.message_id = azure_queue_update_extra.message_id | |
and msgs.pop_receipt = azure_queue_update_extra.pop_receipt; | |
end$function$ | |
-- live migration strategy: | |
-- 0. locking azure_queue_messages for reading until data is migrated | |
-- 1. create table for resolved and deadline queues | |
-- 2. move data for resolved-queue, deadline-queue to the new tables | |
-- 3. create table for pending + claimed tasks | |
-- 4. move data for pending tasks to the new table | |
-- ^^ `-{provisionerId hash}-{workerType hash}-{priority}` hashes would need to be pre-calculated to know who is who | |
-- hash is sha256 base32 [0..15] (first 16 chars) | |
-- hard to calculate in SQL, probably we'll have to take a snapshot before the migration and encode all possible values | |
-- 5. unlock and delete azure_queue_messages | |
-- background tasks would be using new tables, so once existsing records will be moved, they will pick those messages | |
-- queue api would be already using new table after deployment, so new data will go into those tables directly | |
begin | |
-- migration is only possible when all tasks were migrated to new column structure | |
IF EXISTS ( | |
SELECT message_id FROM azure_queue_messages | |
WHERE queue_name NOT IN ('claim-queue', 'deadline-queue', 'resolved-queue') | |
AND task_queue_id IS NULL | |
LIMIT 1 | |
) THEN | |
RAISE EXCEPTION 'Not possible to migrate, some records are still in old format'; | |
END IF; | |
-- migration of data | |
-- prevent reads and writes from table, get exclusive access | |
LOCK TABLE azure_queue_messages; | |
-- Task Deadlines | |
-- purpose: know when particular task expires, so dependencies/task groups can be resolved | |
-- queries: what tasks are not scheduled for given task_group_id/scheduler_id | |
CREATE TABLE queue_task_deadlines ( | |
task_id text not null, | |
task_group_id text not null, | |
scheduler_id text not null, | |
created_at timestamptz not null, | |
deadline_at timestamptz not null, | |
expires timestamptz not null, | |
pop_receipt uuid null | |
); | |
-- migrate data to deadline queue | |
INSERT INTO queue_task_deadlines | |
(task_group_id, task_id, scheduler_id, created_at, deadline_at, expires, pop_receipt) | |
SELECT | |
payload->>'taskGroupId', payload->>'taskId', payload->>'schedulerId', inserted, visible, expires, pop_receipt | |
FROM azure_queue_messages | |
WHERE queue_name = 'deadline-queue' | |
AND expires > now(); | |
-- Resolved Tasks | |
-- queries: what tasks are not scheduled for given task_group_id/scheduler_id | |
-- purpose: not much since this will have short-lived data | |
CREATE TABLE queue_resolved_tasks ( | |
task_group_id text not null, | |
task_id text not null, | |
scheduler_id text not null, | |
resolution text not null, | |
resolved_at timestamptz not null, | |
expires timestamptz not null, | |
pop_receipt uuid null | |
); | |
-- migrate data to resolved queue | |
INSERT INTO | |
queue_resolved_tasks (task_id, task_group_id, scheduler_id, resolution, resolved_at, expires, pop_receipt) | |
SELECT | |
payload->>'taskId', payload->>'taskGroupId', payload->>'schedulerId', payload->>'resolution', inserted, expires, pop_receipt | |
FROM azure_queue_messages | |
WHERE queue_name = 'resolved-queue' | |
AND expires > now(); | |
-- Claimed Tasks | |
-- queries: what tasks are running and waiting to be reclaimed or resolved | |
CREATE TABLE queue_claimed_tasks ( | |
task_id text not null, | |
run_id integer not null, | |
claimed_at timestamptz not null, | |
taken_until timestamptz not null, | |
expires timestamptz not null, | |
pop_receipt uuid null | |
); | |
-- migrate data to resolved queue | |
INSERT INTO | |
queue_claimed_tasks (task_id, run_id, claimed_at, taken_until, expires, pop_receipt) | |
SELECT | |
payload->>'taskId', CAST(payload->>'runId' AS INTEGER), inserted, CAST(payload->>'takenUntil' AS timestamp with time zone), expires, pop_receipt | |
FROM azure_queue_messages | |
WHERE queue_name = 'claim-queue' | |
AND expires > now(); | |
-- Pending (scheduled) | |
-- purpose: keep the record of all tasks that were scheduled and claimed | |
-- queries: | |
-- * what tasks are pending for a given queue | |
-- * what tasks are claimed and still running (for claim expire purpose) | |
-- * find tasks that were claimed but not resolved (for claim expire purpose) | |
CREATE TABLE queue_pending_tasks ( | |
task_queue_id text not null, | |
priority int not null, | |
task_id text not null, | |
run_id integer not null, | |
hint_id text not null, -- to know that task was claimed properly, stored inside tasks.runs[] | |
inserted_at timestamptz not null, | |
expires timestamptz not null, | |
pop_receipt uuid null | |
); | |
-- rest goes into queue_pending_tasks | |
INSERT INTO queue_pending_tasks | |
(task_queue_id, priority, task_id, run_id, hint_id, inserted_at, expires, pop_receipt) | |
SELECT | |
task_queue_id, priority, payload->>'taskId', CAST(payload->>'runId' AS INTEGER), payload->>'hintId', inserted, expires, pop_receipt | |
FROM azure_queue_messages | |
WHERE queue_name NOT IN ('claim-queue', 'deadline-queue', 'resolved-queue') | |
AND task_queue_id IS NOT NULL | |
AND expires > now(); | |
GRANT select, insert, update, delete ON queue_pending_tasks to $db_user_prefix$_queue; | |
GRANT select, insert, update, delete ON queue_task_deadlines to $db_user_prefix$_queue; | |
GRANT select, insert, update, delete ON queue_resolved_tasks to $db_user_prefix$_queue; | |
end | |
-- indexes tbd.. | |
-- alter table queue_pending_tasks add primary key (task_queue_id, task_id); | |
grant select, insert, update, delete on queue_pending_tasks to $db_user_prefix$_queue; | |
grant select, insert, update, delete on queue_task_deadlines to $db_user_prefix$_queue; | |
grant select, insert, update, delete on queue_resolved_tasks to $db_user_prefix$_queue; | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment