Skip to content

Instantly share code, notes, and snippets.

@wesseljt
Created May 7, 2024 22:29
Show Gist options
  • Save wesseljt/61189cd96db1baf09cde18bc0b78bc6d to your computer and use it in GitHub Desktop.
Save wesseljt/61189cd96db1baf09cde18bc0b78bc6d to your computer and use it in GitHub Desktop.
-- Create a table to store the final merged user IDs
CREATE TEMPORARY TABLE merged_user_ids (
user_id INT,
merged_id_array INT[],
earliest_mapping_timestamp TIMESTAMP,
audit_details JSON
);
-- Recursive query to find all connected components (clusters of user IDs)
WITH RECURSIVE recursive_user_mappings AS (
-- Initial selection from user_stitching
SELECT
user_id,
mapped_id,
mapping_type,
earliest_mapping_timestamp
FROM user_stitching
UNION
-- Recursive part to find indirect mappings
SELECT
a.user_id,
b.mapped_id,
'indirect' AS mapping_type,
LEAST(a.earliest_mapping_timestamp, b.earliest_mapping_timestamp) AS earliest_mapping_timestamp
FROM recursive_user_mappings a
JOIN recursive_user_mappings b ON a.mapped_id = b.user_id
WHERE a.user_id != b.mapped_id -- Prevent cycles in the recursion
),
clusters AS (
-- Group all mapped IDs by user_id into arrays and determine the earliest mapping timestamp
SELECT
user_id,
ARRAY_AGG(DISTINCT mapped_id) AS all_mapped_ids,
MIN(earliest_mapping_timestamp) AS first_mapped_timestamp
FROM recursive_user_mappings
GROUP BY user_id
)
INSERT INTO merged_user_ids
SELECT
user_id,
all_mapped_ids,
first_mapped_timestamp,
json_build_object(
'created_at', CURRENT_TIMESTAMP,
'source_count', array_length(all_mapped_ids, 1),
'notes', 'Merged based on recursive clustering'
) AS audit_details
FROM clusters;
-- Additional logic to handle specific cases, such as users with multiple mappings across different sources
WITH multiple_mappings AS (
SELECT
user_id,
ARRAY_AGG(DISTINCT mapped_id ORDER BY mapped_id) AS ids,
COUNT(DISTINCT source) AS source_count
FROM user_stitching
GROUP BY user_id
HAVING COUNT(DISTINCT mapped_id) > 1
)
UPDATE merged_user_ids
SET merged_id_array = ids,
audit_details = jsonb_set(audit_details, '{notes}', '"Updated with multiple source mappings"')
FROM multiple_mappings
WHERE merged_user_ids.user_id = multiple_mappings.user_id;
-- Validate merged data by checking for consistency
WITH validation AS (
SELECT
m.user_id,
unnest(m.merged_id_array) AS merged_id
FROM merged_user_ids m
)
SELECT
'Validation Report' AS report,
COUNT(DISTINCT user_id) AS unique_users,
COUNT(DISTINCT merged_id) AS total_ids
FROM validation;
-- Auditing inserts for error tracing and rollback plans
INSERT INTO merge_audit_logs
SELECT
user_id,
merged_id_array,
earliest_mapping_timestamp,
audit_details
FROM merged_user_ids;
-- Final cleanup of temporary structures if needed (commented out for safety)
-- DROP TABLE IF EXISTS recursive_user_mappings;
-- DROP TABLE IF EXISTS clusters;
-- DROP TABLE IF EXISTS merged_user_ids;
-- Queries to check the results and ensure data integrity
SELECT * FROM merged_user_ids LIMIT 10; -- Sample check for auditing
SELECT user_id, COUNT(*) FROM merged_user_ids GROUP BY user_id HAVING COUNT(*) > 1; -- Check for duplicates
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment