Created
November 30, 2018 22:31
-
-
Save gajus/2b68111bab05ed017d56cc81c3d42dc2 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CREATE OR REPLACE FUNCTION cinema_data_task_with_high_error_rate(for_update boolean) | |
RETURNS table(cinema_data_task_id int) | |
AS $$ | |
BEGIN | |
RETURN QUERY | |
EXECUTE $q$ | |
WITH | |
latest_cinema_root_cinema_data_task AS ( | |
SELECT cinema_id, max(id) root_cinema_data_task_id | |
FROM cinema_data_task | |
WHERE target_data = 'venues' | |
GROUP BY cinema_id | |
), | |
active_cinema_data_task_target_data AS ( | |
SELECT DISTINCT ON (cdtv1.cinema_id) | |
cdtv1.cinema_id, | |
cdtv1.target_data | |
FROM cinema_data_task_view cdtv1 | |
INNER JOIN latest_cinema_root_cinema_data_task lcrcdt1 ON lcrcdt1.cinema_id = cdtv1.cinema_id | |
WHERE | |
cdtv1.root_cinema_data_task_id = lcrcdt1.root_cinema_data_task_id AND | |
cdtv1.state IN ('OUTSTANDING', 'RUNNING') | |
ORDER BY | |
cdtv1.cinema_id, | |
cdtv1.target_data | |
), | |
last_cinema_target_data_cinema_data_task_sample AS ( | |
SELECT | |
cinema_data_task_sample.cinema_id, | |
cinema_data_task_sample.target_data, | |
cinema_data_task_sample.terminated_reason_is_safe | |
FROM active_cinema_data_task_target_data acdttd1 | |
INNER JOIN LATERAL ( | |
SELECT | |
cdtv1.cinema_id, | |
cdtv1.target_data, | |
cdtv1.terminated_reason_is_safe | |
FROM cinema_data_task_view cdtv1 | |
INNER JOIN latest_cinema_root_cinema_data_task lcrcdt1 ON lcrcdt1.cinema_id = cdtv1.cinema_id | |
WHERE | |
cdtv1.root_cinema_data_task_id = lcrcdt1.root_cinema_data_task_id AND | |
cdtv1.cinema_id = acdttd1.cinema_id AND | |
cdtv1.target_data = acdttd1.target_data AND | |
cdtv1.last_attempted_at IS NOT NULL AND | |
cdtv1.last_attempted_at > now() - INTERVAL '3 hour' | |
ORDER BY cdtv1.last_attempted_at DESC | |
LIMIT 100 | |
) cinema_data_task_sample ON TRUE | |
), | |
running_stats AS ( | |
SELECT | |
lctdcdts1.cinema_id, | |
lctdcdts1.target_data, | |
count(*) task_count, | |
sum( | |
CASE | |
WHEN | |
terminated_reason_is_safe IS NOT NULL AND | |
terminated_reason_is_safe = FALSE | |
THEN 1 | |
ELSE 0 | |
END | |
) unsafe_termination_count, | |
sum( | |
CASE | |
WHEN | |
terminated_reason_is_safe IS NOT NULL AND | |
terminated_reason_is_safe = TRUE | |
THEN 1 | |
ELSE 0 | |
END | |
) safe_termination_count | |
FROM last_cinema_target_data_cinema_data_task_sample lctdcdts1 | |
GROUP BY | |
lctdcdts1.cinema_id, | |
lctdcdts1.target_data | |
), | |
high_error_rate_cinema_data_task_cinema_target_data AS ( | |
SELECT | |
rs1.cinema_id, | |
rs1.target_data | |
FROM running_stats rs1 | |
WHERE | |
rs1.task_count = 100 AND | |
rs1.unsafe_termination_count >= 50 | |
) | |
SELECT cdt1.id | |
FROM high_error_rate_cinema_data_task_cinema_target_data hercdtctd1 | |
INNER JOIN cinema_data_task cdt1 ON cdt1.cinema_id = hercdtctd1.cinema_id AND cdt1.target_data = hercdtctd1.target_data | |
INNER JOIN cinema_data_task_view cdtv1 ON cdtv1.id = cdt1.id | |
WHERE | |
cdtv1.state IN ('OUTSTANDING', 'RUNNING') | |
-- Prevent "cinema_data_task_queue" from creating new tasks. | |
$q$ || (CASE WHEN for_update = TRUE THEN 'FOR UPDATE OF cdt1' ELSE '' END) || $q$ | |
$q$; | |
END | |
$$ | |
LANGUAGE plpgsql; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment