Skip to content

Instantly share code, notes, and snippets.

@denniskigen
Created October 28, 2019 08:31
Show Gist options
  • Save denniskigen/5cdc678ca810cfceeb16d966ab4f8f5a to your computer and use it in GitHub Desktop.
Save denniskigen/5cdc678ca810cfceeb16d966ab4f8f5a to your computer and use it in GitHub Desktop.
CREATE DEFINER=`etl_user`@`%` PROCEDURE `generate_flat_onc_htn_dm_triage_v1_0`(IN query_type VARCHAR(50), IN queue_number INT, IN queue_size INT, IN cycle_size INT)
BEGIN
SET @primary_table := "flat_onc_htn_dm_triage";
SET @query_type := query_type;
SET @total_rows_written := 0;
SET @encounter_types := "(130)";
SET @start := now();
SET @table_version := "flat_onc_htn_dm_triage_v1.0";
SET session sort_buffer_size := 512000000;
SET @sep := " ## ";
SET @boundary := "!!";
SET @last_date_created := (SELECT MAX(max_date_created) FROM etl.flat_obs);
CREATE TABLE IF NOT EXISTS flat_onc_htn_dm_triage (
date_created TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
person_id INT,
encounter_id INT,
encounter_type INT,
cur_visit_type INT,
location_id INT,
encounter_datetime DATETIME,
temperature DECIMAL(3, 1),
waist_circumference DECIMAL(3, 1),
weight DECIMAL(3, 1),
height DECIMAL(3, 1),
bmi DECIMAL(2, 1),
systolic_bp INT,
diastolic_bp INT,
fasting_blood_glucose DECIMAL(3, 1),
random_blood_glucose DECIMAL(3, 1),
PRIMARY KEY encounter_id (encounter_id),
INDEX date_created (date_created)
);
IF (@query_type = "build") THEN
SELECT 'BUILDING..........................................';
SET @write_table := CONCAT('flat_onc_htn_dm_triage_temp_', queue_number);
SET @queue_table := CONCAT('flat_onc_htn_dm_triage_build_queue_', queue_number);
SET @dyn_sql := CONCAT('CREATE TABLE IF NOT EXISTS ', @write_table, ' LIKE ', @primary_table);
PREPARE s1 FROM @dyn_sql;
EXECUTE s1;
DEALLOCATE PREPARE s1;
SET @dyn_sql := CONCAT('CREATE TABLE IF NOT EXISTS ', @queue_table, ' (SELECT * FROM flat_onc_htn_dm_triage_build_queue LIMIT ', queue_size, ');');
PREPARE s1 FROM @dyn_sql;
EXECUTE s1;
DEALLOCATE PREPARE s1;
END IF;
IF (@query_type = 'sync') THEN
SELECT 'SYNCING.....................................';
SET @write_table := 'flat_onc_htn_dm_triage';
SET @queue_table := 'flat_onc_htn_dm_triage_sync_queue';
CREATE TABLE IF NOT EXISTS flat_onc_htn_dm_triage_sync_queue (person_id INT PRIMARY KEY);
SET @last_update := null;
SELECT MAX(date_updated) INTO @last_update FROM etl.flat_log WHERE table_name = @table_version;
SELECT 'Finding patients in amrs.encounters...';
REPLACE INTO flat_onc_htn_dm_triage_sync_queue
(SELECT DISTINCT
patient_id
FROM
amrs.encounter
WHERE
date_changed > @last_update
);
SELECT 'Finding patients in flat_obs...';
REPLACE INTO flat_onc_htn_dm_triage_sync_queue
(SELECT DISTINCT
person_id
FROM
etl.flat_obs
WHERE
max_date_created > @last_update
);
SELECT 'Finding patients in flat_lab_obs...';
REPLACE INTO flat_onc_htn_dm_triage_sync_queue
(SELECT DISTINCT
person_id
FROM
etl.flat_lab_obs
WHERE
max_date_created > @last_update
);
SELECT 'Finding patients in flat_orders...';
REPLACE INTO flat_onc_htn_dm_triage_sync_queue
(SELECT DISTINCT
person_id
FROM
etl.flat_orders
WHERE
max_date_created > @last_update
);
REPLACE INTO flat_onc_htn_dm_triage_sync_queue
(SELECT
person_id
FROM
amrs.person
WHERE
date_voided > @last_update
);
REPLACE INTO flat_onc_htn_dm_triage_sync_queue
(SELECT
person_id
FROM
amrs.person
WHERE
date_changed > @last_update
);
END IF;
-- Remove test patients
SET @dyn_sql := CONCAT('DELETE t1 FROM ', @queue_table, ' t1
JOIN amrs.person_attribute t2 USING (person_id)
WHERE t2.person_attribute_type_id = 28 AND value = "true" AND voided = 0');
PREPARE s1 FROM @dyn_sql;
EXECUTE s1;
DEALLOCATE PREPARE s1;
SET @person_ids_count = 0;
SET @dyn_sql=CONCAT('SELECT COUNT(*) INTO @person_ids_count FROM ', @queue_table);
PREPARE s1 from @dyn_sql;
EXECUTE s1;
DEALLOCATE PREPARE s1;
SELECT @person_ids_count AS 'num of patients to update';
SET @dyn_sql := CONCAT('DELETE t1 FROM ',@primary_table, ' t1 JOIN ', @queue_table,' t2 USING (person_id);');
PREPARE s1 FROM @dyn_sql;
EXECUTE s1;
DEALLOCATE PREPARE s1;
SET @person_ids_count = 0;
SET @dyn_sql := CONCAT('SELECT COUNT(*) INTO @person_ids_count FROM ', @queue_table);
PREPARE s1 FROM @dyn_sql;
EXECUTE s1;
DEALLOCATE PREPARE s1;
SELECT @person_ids_count AS 'num patients to update';
SET @dyn_sql := CONCAT('DELETE t1 FROM ', @primary_table, ' t1 join ', @queue_table,' t2 USING (person_id);');
PREPARE s1 FROM @dyn_sql;
EXECUTE s1;
DEALLOCATE PREPARE s1;
SET @total_time = 0;
SET @cycle_number = 0;
SET @total_time = 0;
SET @cycle_number = 0;
WHILE @person_ids_count > 0 DO
SET @loop_start_time := now();
-- Create temporary table with a set of person ids
DROP TEMPORARY TABLE IF EXISTS flat_onc_htn_dm_triage_build_queue__0;
SET @dyn_sql := CONCAT('CREATE TEMPORARY TABLE flat_onc_htn_dm_triage_build_queue__0 (person_id INT PRIMARY KEY) (SELECT * FROM ', @queue_table, ' LIMIT ', cycle_size, ');');
PREPARE s1 FROM @dyn_sql;
EXECUTE s1;
DEALLOCATE PREPARE s1;
DROP TEMPORARY TABLE IF EXISTS flat_onc_htn_dm_triage_0a;
SET @dyn_sql := CONCAT(
'CREATE TEMPORARY TABLE flat_onc_htn_dm_triage_0a
(SELECT
t1.person_id,
t1.encounter_id,
t1.encounter_datetime,
t1.encounter_type,
t1.location_id,
t1.obs,
t1.obs_datetimes,
t2.orders
FROM
etl.flat_obs t1
JOIN
flat_onc_htn_dm_triage_build_queue__0 t0 USING (person_id)
LEFT JOIN
etl.flat_orders t2 USING (encounter_id)
WHERE
t1.encounter_type IN ', @encounter_types, ');');
PREPARE s1 from @dyn_sql;
EXECUTE s1;
DEALLOCATE PREPARE s1;
INSERT INTO flat_onc_htn_dm_triage_0a
(SELECT
t1.person_id,
t1.encounter_id,
t1.test_datetime,
t1.encounter_type,
null,
t1.obs,
null,
null
FROM
etl.flat_lab_obs t1
JOIN
flat_onc_htn_dm_triage_build_queue__0 t0 USING (person_id)
);
DROP TEMPORARY TABLE IF EXISTS flat_onc_htn_dm_triage_0;
CREATE TEMPORARY TABLE flat_onc_htn_dm_triage_0 (INDEX encounter_id (encounter_id), INDEX person_enc (person_id, encounter_datetime))
(SELECT
*
FROM
flat_onc_htn_dm_triage_0a
ORDER BY
person_id, DATE(encounter_datetime)
);
SET @cur_visit_type := null;
SET @temperature := null;
SET @waist_circumference := null;
SET @weight := null;
SET @height := null;
SET @bmi := null;
SET @systolic_bp := null;
SET @diastolic_bp := null;
SET @fasting_blood_glucose := null;
SET @random_blood_glucose := null;
DROP TEMPORARY TABLE IF EXISTS flat_onc_htn_dm_triage_1;
CREATE TEMPORARY TABLE flat_onc_htn_dm_triage_1
(SELECT
obs,
@prev_id := @cur_id as prev_id,
@cur_id := t1.person_id as cur_id,
t1.person_id,
t1.encounter_id,
t1.encounter_type,
t1.location_id,
t1.encounter_datetime,
CASE
WHEN obs REGEXP '!!5088=' then @temperature := GetValues(obs, 5088)
END AS temperature,
CASE
WHEN obs REGEXP '!!7231=' then @waist_circumference := GetValues(obs, 7231)
END AS waist_circumference,
CASE
WHEN obs REGEXP '!!5089=' then @weight := GetValues(obs, 5089)
END AS weight,
CASE
WHEN obs REGEXP '!!5090=' then @height := GetValues(obs, 5090)
END AS height,
CASE
WHEN obs REGEXP '!!1342=' then @bmi := GetValues(obs, 1342)
END AS bmi,
CASE
WHEN obs REGEXP '!!5085=' then @systolic_bp := GetValues(obs, 5085)
END AS systolic_bp,
CASE
WHEN obs REGEXP '!!5086=' then @diastolic_bp := GetValues(obs, 5086)
END AS diastolic_bp,
CASE
WHEN obs REGEXP '!!6252=' then @fasting_blood_glucose := GetValues(obs, 6252)
END AS fasting_blood_glucose,
CASE
WHEN obs REGEXP '!!887=' then @random_blood_glucose := GetValues(obs, 887)
END AS random_blood_glucose
FROM
flat_onc_htn_dm_triage_0 t1
JOIN
amrs.person p using (person_id)
ORDER BY person_id, date(encounter_datetime) DESC
);
SET @prev_id = null;
SET @cur_id = null;
ALTER TABLE flat_onc_htn_dm_triage_1 DROP prev_id, DROP cur_id;
-- DROP TABLE IF EXISTS flat_onc_htn_dm_triage_2;
-- CREATE TEMPORARY TABLE flat_onc_htn_dm_triage_2
-- (SELECT
-- *,
-- @prev_id := @cur_id AS prev_id,
-- @cur_id := person_id AS cur_id
-- FROM
-- flat_onc_htn_dm_triage_1
-- ORDER BY person_id, DATE(encounter_datetime) DESC
-- );
-- alter table flat_onc_htn_dm_triage_2 drop prev_id, drop cur_id;
-- SET @prev_id = null;
-- SET @cur_id = null;
-- drop temporary table if exists flat_onc_htn_dm_triage_3;
-- create temporary table flat_onc_htn_dm_triage_3 (prev_encounter_datetime datetime, prev_encounter_type int, index person_enc (person_id, encounter_datetime desc))
-- (select
-- *,
-- @prev_id := @cur_id as prev_id,
-- @cur_id := t1.person_id as cur_id
-- from flat_onc_htn_dm_triage_2 t1
-- order by person_id, date(encounter_datetime)
-- );
SELECT
COUNT(*)
INTO
@new_encounter_rows
FROM
flat_onc_htn_dm_triage_1;
SELECT @new_encounter_rows;
SET @total_rows_written := @total_rows_written + @new_encounter_rows;
SELECT @total_rows_written;
SET @dyn_sql := CONCAT('replace into ', @write_table,
'(SELECT
null,
person_id,
encounter_id,
encounter_type,
location_id,
encounter_datetime,
temperature,
waist_circumference,
weight,
height,
bmi,
systolic_bp,
diastolic_bp,
fasting_blood_glucose,
random_blood_glucose
FROM
flat_onc_htn_dm_triage_3 t1
JOIN
amrs.location t2 USING (location_id))'
);
PREPARE s1 from @dyn_sql;
EXECUTE s1;
DEALLOCATE PREPARE s1;
SET @dyn_sql=CONCAT('delete t1 from ',@queue_table,' t1 join flat_onc_htn_dm_triage_build_queue__0 t2 using (person_id);');
PREPARE s1 from @dyn_sql;
EXECUTE s1;
DEALLOCATE PREPARE s1;
SET @dyn_sql=CONCAT('select count(*) into @person_ids_count from ',@queue_table,';');
PREPARE s1 from @dyn_sql;
EXECUTE s1;
DEALLOCATE PREPARE s1;
SET @cycle_length = timestampdiff(second,@loop_start_time,now());
SET @total_time = @total_time + @cycle_length;
SET @cycle_number = @cycle_number + 1;
SET @remaining_time = ceil((@total_time / @cycle_number) * ceil(@person_ids_count / cycle_size) / 60);
SELECT
@person_ids_count AS 'persons remaining',
@cycle_length AS 'Cycle time (s)',
CEIL(@person_ids_count / cycle_size) AS remaining_cycles,
@remaining_time AS 'Est time remaining (min)';
END WHILE;
IF (@query_type = 'build') THEN
SET @dyn_sql := CONCAT('drop table ', @queue_table, ';');
PREPARE s1 FROM @dyn_sql;
EXECUTE s1;
DEALLOCATE PREPARE s1;
SET @total_rows_to_write := 0;
SET @dyn_sql := CONCAT('SELECT COUNT(*) INTO @total_rows_to_write FROM ', @write_table);
PREPARE s1 FROM @dyn_sql;
EXECUTE s1;
DEALLOCATE PREPARE s1;
SET @start_write := NOW();
SELECT CONCAT(@start_write, ' : Writing ', @total_rows_to_write, ' to ', @primary_table);
SET @dyn_sql := CONCAT('REPLACE INTO ', @primary_table, '(SELECT * FROM ', @write_table, ');');
PREPARE s1 FROM @dyn_sql;
EXECUTE s1;
DEALLOCATE PREPARE s1;
SET @finish_write := NOW();
SET @time_to_write := TIMESTAMPDIFF(SECOND, @start_write, @finish_write);
SELECT CONCAT(@finish_write, ' : Completed writing rows. Time to write to primary table: ', @time_to_write, ' seconds ');
SET @dyn_sql := CONCAT('DROP TABLE ', @write_table, ';');
PREPARE s1 FROM @dyn_sql;
EXECUTE s1;
DEALLOCATE PREPARE s1;
END IF;
SET @ave_cycle_length := CEIL(@total_time / @cycle_number);
SELECT CONCAT('Average Cycle Length: ', @ave_cycle_length, ' second(s)');
SET @end := NOW();
INSERT INTO etl.flat_log VALUES (@start, @last_date_created, @table_version, TIMESTAMPDIFF(SECOND, @start, @end));
SELECT CONCAT(@table_version, ': Time to complete: ', TIMESTAMPDIFF(MINUTE, @start, @end), ' minutes');
END
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment