Skip to content

Instantly share code, notes, and snippets.

@xoelop
Last active April 23, 2023 19:43
Show Gist options
  • Save xoelop/6b4de2fd0fcee1a9f6be8b1a0bea1c36 to your computer and use it in GitHub Desktop.
Save xoelop/6b4de2fd0fcee1a9f6be8b1a0bea1c36 to your computer and use it in GitHub Desktop.
Deduplicating rows on Tinybird almost on ingestion time

What this does:

  • Insert rows to a datasource from postgres
  • Calculate duplicated rows with a pipe
  • Write those to a secondary datasource
  • Delete duplicates in the primary datasource that are present in the secondary DS
  • Truncate the secondary datasource

With this approach, if you run the delete scripts right after each insert operation, you'll have duplicates only for a couple of seconds.

With ReplacingMergeTrees you'll always have duplicated data (merges only happen once per day or so). If you need no duplicates at all you'll need either a MV that writes to an AggregatingMergeTree with ArgMaxState(...) funcs or something like this. With this, queries are simpler as you don't need ArgMaxMerge funcs and so on. Other approaches like ReplacingMergeTree + adding a FINAL clause to queries won't work with big data as that puts the full datasource in RAM.

# full process. Ingests data from postgres to tb, calculates duplicates, inserts them to a new datasource and removes the rows from that datasource that appear in the original one.
source $(pwd)/.env
# related, to ingest data from postgres: https://blog.tinybird.co/2019/10/14/the-one-cron-job-that-will-speed-up-your-analytical-queries-in-postgres-a-hundred-fold/
echo 'Ingesting most recently update jobs'
psql $HEROKU_POSTGRES_URL -c "COPY (SELECT id, url, job_title, company, description, description_cleaned, date_posted, now() FROM job WHERE COALESCE(description, '') <> '' AND updated_at > now() - interval '70 minutes') TO STDOUT WITH (FORMAT CSV)" | curl -F csv=@- "https://api.tinybird.co/v0/datasources?name=jobs&mode=append&token=$TINYBIRD_ADMIN_TOKEN";
echo 'Storing duplicated jobs';
sh scripts/sql/sh56_populate_jobs_to_deduplicate_datasource.sh;
echo 'Deleting duplicated jobs';
sh scripts/sql/sh57_delete_duplicated_jobs_from_tinybird.sh
SCHEMA >
`id` Int64,
`url` String,
`job_title` String,
`company` String,
`description` Nullable(String),
`description_cleaned` Nullable(String),
`date_posted` Date,
`updated_to_tb_at` DateTime
ENGINE MergeTree
ENGINE_SORTING_KEY id
ENGINE_PARTITION_KEY toYYYYMM(date_posted)
SCHEMA >
`id` Int64,
`updated_to_tb_at` DateTime
ENGINE MergeTree
ENGINE_SORTING_KEY id, updated_to_tb_at
NODE finding_duplicated_jobs
SQL >
SELECT
id,
count(*) AS c,
groupArrayDistinct(date_posted),
groupArray(updated_to_tb_at),
any(url) AS url,
max(updated_to_tb_at) AS max_uploaded_to_tb_at
FROM jobs
GROUP BY id
HAVING c > 1
ORDER BY count() DESC
NODE all_jobs_to_delete_data
SQL >
SELECT j.*
FROM jobs j
INNER JOIN finding_duplicated_jobs USING (id)
WHERE updated_to_tb_at < max_uploaded_to_tb_at
source $(pwd)/.env
# doing this instead of materializing the results from not_deduplicated_jobs_yet to another data source directly misses many rows. My guess is that it's because the source datasource has a ReplacingMergeTree engine and some merging happens before materializing the result
Q='select id, updated_to_tb_at from _'
Q='select%20id%2C%20updated_to_tb_at%20from%20_'
deduplicated_jobs_url="https://api.tinybird.co/v0/pipes/not_deduplicated_jobs_yet.csv?token=${TINYBIRD_ADMIN_TOKEN}&q=${Q}"
# echo $deduplicated_jobs_url
# populate jobs_to_deduplicate datasource with result from pipe that finds duplicated jobs
job_url=$(curl \
-s \
-H "Authorization: Bearer ${TINYBIRD_ADMIN_TOKEN}" \
-X POST "https://api.tinybird.co/v0/datasources" \
-d mode='append' \
-d name='jobs_to_deduplicate' \
--data-urlencode "url=${deduplicated_jobs_url}" | jq '.job_url' -r)
echo Import job url: $job_url
# waits for the import to finish. So that the delete happens after the import has finished
status='start'
while [ "$status" != 'done' ]
do
sleep 1
echo Waiting for the ingest job to complete
status=$(curl \
-s \
-H "Authorization: Bearer ${TINYBIRD_ADMIN_TOKEN}" \
-X GET $job_url \ | jq '.status' -r )
done
source $(pwd)/.env
DELETE_CONDITION='((id, updated_to_tb_at) IN (select id, updated_to_tb_at FROM jobs_to_deduplicate))'
# add LIMIT 10 to test it
# note: you may feel tempted to set the delete condition to read directly from the pipe that finds duplicates
# don't do it. The delete op will timeout after 30 mins and you won't be able to do more deletes on that data source
# IDK why that happens.
job_url=$(curl \
-s \
-H "Authorization: Bearer $TINYBIRD_ADMIN_TOKEN" \
--data "delete_condition=${DELETE_CONDITION}" \
"https://api.tinybird.co/v0/datasources/jobs/delete" | jq '.job_url' -r)
echo Delete job url: $job_url
# waits for the import to finish. So that the delete happens after the import has finished
status='start'
while [ "$status" != 'done' ]
do
sleep 1
echo Waiting for the delete job to complete
status=$(curl \
-s \
-H "Authorization: Bearer ${TINYBIRD_ADMIN_TOKEN}" \
-X GET $job_url \ | jq '.status' -r )
done
echo Truncating jobs_to_delete datasource
truncate datasource used to store jobs to be deleted
curl \
-H "Authorization: Bearer $TINYBIRD_ADMIN_TOKEN" \
-X POST "https://api.tinybird.co/v0/datasources/jobs_to_deduplicate/truncate"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment