Skip to content

Instantly share code, notes, and snippets.

@gajus
Created November 30, 2018 22:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gajus/2b68111bab05ed017d56cc81c3d42dc2 to your computer and use it in GitHub Desktop.
Save gajus/2b68111bab05ed017d56cc81c3d42dc2 to your computer and use it in GitHub Desktop.
CREATE OR REPLACE FUNCTION cinema_data_task_with_high_error_rate(for_update boolean)
RETURNS table(cinema_data_task_id int)
AS $$
BEGIN
RETURN QUERY
EXECUTE $q$
WITH
latest_cinema_root_cinema_data_task AS (
SELECT cinema_id, max(id) root_cinema_data_task_id
FROM cinema_data_task
WHERE target_data = 'venues'
GROUP BY cinema_id
),
active_cinema_data_task_target_data AS (
SELECT DISTINCT ON (cdtv1.cinema_id)
cdtv1.cinema_id,
cdtv1.target_data
FROM cinema_data_task_view cdtv1
INNER JOIN latest_cinema_root_cinema_data_task lcrcdt1 ON lcrcdt1.cinema_id = cdtv1.cinema_id
WHERE
cdtv1.root_cinema_data_task_id = lcrcdt1.root_cinema_data_task_id AND
cdtv1.state IN ('OUTSTANDING', 'RUNNING')
ORDER BY
cdtv1.cinema_id,
cdtv1.target_data
),
last_cinema_target_data_cinema_data_task_sample AS (
SELECT
cinema_data_task_sample.cinema_id,
cinema_data_task_sample.target_data,
cinema_data_task_sample.terminated_reason_is_safe
FROM active_cinema_data_task_target_data acdttd1
INNER JOIN LATERAL (
SELECT
cdtv1.cinema_id,
cdtv1.target_data,
cdtv1.terminated_reason_is_safe
FROM cinema_data_task_view cdtv1
INNER JOIN latest_cinema_root_cinema_data_task lcrcdt1 ON lcrcdt1.cinema_id = cdtv1.cinema_id
WHERE
cdtv1.root_cinema_data_task_id = lcrcdt1.root_cinema_data_task_id AND
cdtv1.cinema_id = acdttd1.cinema_id AND
cdtv1.target_data = acdttd1.target_data AND
cdtv1.last_attempted_at IS NOT NULL AND
cdtv1.last_attempted_at > now() - INTERVAL '3 hour'
ORDER BY cdtv1.last_attempted_at DESC
LIMIT 100
) cinema_data_task_sample ON TRUE
),
running_stats AS (
SELECT
lctdcdts1.cinema_id,
lctdcdts1.target_data,
count(*) task_count,
sum(
CASE
WHEN
terminated_reason_is_safe IS NOT NULL AND
terminated_reason_is_safe = FALSE
THEN 1
ELSE 0
END
) unsafe_termination_count,
sum(
CASE
WHEN
terminated_reason_is_safe IS NOT NULL AND
terminated_reason_is_safe = TRUE
THEN 1
ELSE 0
END
) safe_termination_count
FROM last_cinema_target_data_cinema_data_task_sample lctdcdts1
GROUP BY
lctdcdts1.cinema_id,
lctdcdts1.target_data
),
high_error_rate_cinema_data_task_cinema_target_data AS (
SELECT
rs1.cinema_id,
rs1.target_data
FROM running_stats rs1
WHERE
rs1.task_count = 100 AND
rs1.unsafe_termination_count >= 50
)
SELECT cdt1.id
FROM high_error_rate_cinema_data_task_cinema_target_data hercdtctd1
INNER JOIN cinema_data_task cdt1 ON cdt1.cinema_id = hercdtctd1.cinema_id AND cdt1.target_data = hercdtctd1.target_data
INNER JOIN cinema_data_task_view cdtv1 ON cdtv1.id = cdt1.id
WHERE
cdtv1.state IN ('OUTSTANDING', 'RUNNING')
-- Prevent "cinema_data_task_queue" from creating new tasks.
$q$ || (CASE WHEN for_update = TRUE THEN 'FOR UPDATE OF cdt1' ELSE '' END) || $q$
$q$;
END
$$
LANGUAGE plpgsql;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment