Skip to content

Instantly share code, notes, and snippets.

@rmoff
Last active March 23, 2025 12:31
Show Gist options
  • Select an option

  • Save rmoff/461fd169843063fc1b9b3113759ff5b6 to your computer and use it in GitHub Desktop.

Select an option

Save rmoff/461fd169843063fc1b9b3113759ff5b6 to your computer and use it in GitHub Desktop.
UK Environment Agency / DuckDB - Data Pipeline

UK Environment Agency + DuckDB

@rmoff / 2025-03-21

          _      _      _
        >(.)__ <(.)__ =(.)__
         (___/  (___/  (___/
°º¤ø,¸¸,ø¤º°`°º¤ø,¸,ø¤°º¤ø,¸¸,ø¤º°`°º¤ø,¸
  1. Install DuckDB

  2. Run prime.sql to set things up

  3. Schedule dimensions.sql and fact.sql to run periodically (e.g. every fifteen minutes)

  4. Run load_history.sh as required.

-- Load the staging data from the REST API
CREATE OR REPLACE TABLE measures_stg AS
WITH src AS (SELECT *
FROM read_json('https://environment.data.gov.uk/flood-monitoring/id/measures'))
SELECT u.* FROM (
SELECT UNNEST(items) AS u FROM src);
CREATE OR REPLACE TABLE stations_stg AS
WITH src AS (SELECT *
FROM read_json('https://environment.data.gov.uk/flood-monitoring/id/stations'))
SELECT u.* FROM (
SELECT UNNEST(items) AS u FROM src);
-- Rebuild dimension tables
CREATE OR REPLACE TABLE measures AS
SELECT *
EXCLUDE ("@id", latestReading)
REPLACE(
REGEXP_REPLACE(station,
'http://environment\.data\.gov\.uk/flood-monitoring/id/stations/',
'') AS station
)
FROM measures_stg;
ALTER TABLE measures
ADD CONSTRAINT measures_pk PRIMARY KEY (notation);
CREATE OR REPLACE TABLE stations AS
SELECT * EXCLUDE (measures)
FROM stations_stg;
ALTER TABLE stations
ADD CONSTRAINT stations_pk PRIMARY KEY (notation);
-- Load the staging data from the REST API
CREATE OR REPLACE TABLE readings_stg AS
WITH src AS (SELECT *
FROM read_json('https://environment.data.gov.uk/flood-monitoring/data/readings?latest'))
SELECT u.* FROM (
SELECT UNNEST(items) AS u FROM src);
-- Merge into the fact table
INSERT OR IGNORE INTO readings
SELECT *
EXCLUDE "@id"
REPLACE(
REGEXP_REPLACE(measure,
'http://environment\.data\.gov\.uk/flood-monitoring/id/measures/',
'') AS measure)
FROM readings_stg;
-- Merge into the denormalised table
INSERT OR IGNORE INTO readings_enriched
SELECT * FROM vw_readings_enriched;
SELECT DATE_TRUNC('day', r_dateTime) AS day, COUNT(*) AS row_ct, MIN(r_dateTime) AS min_dateTime, MAX(r_dateTime) AS max_dateTime FROM readings_enriched GROUP BY 1 ORDER BY 1;
# duckdb new-env-agency.duckdb -c "CREATE OR REPLACE TABLE readings_historical AS
# SELECT *
# FROM read_csv('https://environment.data.gov.uk/flood-monitoring/archive/readings-2025-01-01.csv',
# ignore_errors=true)"
# start_date="2025-01-02"
# end_date="2025-03-20"
# current_date=$start_date
# while [[ "$current_date" < "$end_date" || "$current_date" == "$end_date" ]]; do
# echo "Processing $current_date..."
# duckdb new-env-agency.duckdb -c "COPY readings_historical FROM 'https://environment.data.gov.uk/flood-monitoring/archive/readings-$current_date.csv' (IGNORE_ERRORS);"
# current_date=$(date -d "$current_date + 1 day" +%Y-%m-%d)
# done
duckdb new-env-agency.duckdb -c "INSERT OR IGNORE INTO readings
SELECT *
REPLACE(
REGEXP_REPLACE(measure,
'http://environment.data.gov.uk/flood-monitoring/id/measures/',
'') AS measure,
SPLIT_PART(value, '|', 1) AS value)
FROM readings_historical
WHERE value LIKE '%|%';"
duckdb new-env-agency.duckdb -c "INSERT OR IGNORE INTO readings
SELECT *
REPLACE(
REGEXP_REPLACE(measure,
'http://environment.data.gov.uk/flood-monitoring/id/measures/',
'') AS measure)
FROM readings_historical
WHERE value NOT LIKE '%|%';"
duckdb new-env-agency.duckdb -c "SELECT COUNT(*) as row_ct,
min(dateTime) as min_dateTime,
max(dateTime) as max_dateTime
FROM readings;"
CREATE OR REPLACE TABLE readings_stg AS
WITH src AS (SELECT *
FROM read_json('https://environment.data.gov.uk/flood-monitoring/data/readings?latest'))
SELECT u.* FROM (
SELECT UNNEST(items) AS u FROM src);
CREATE OR REPLACE TABLE measures_stg AS
WITH src AS (SELECT *
FROM read_json('https://environment.data.gov.uk/flood-monitoring/id/measures'))
SELECT u.* FROM (
SELECT UNNEST(items) AS u FROM src);
CREATE OR REPLACE TABLE stations_stg AS
WITH src AS (SELECT *
FROM read_json('https://environment.data.gov.uk/flood-monitoring/id/stations'))
SELECT u.* FROM (
SELECT UNNEST(items) AS u FROM src);
CREATE OR REPLACE TABLE measures AS
SELECT *
EXCLUDE ("@id", latestReading)
REPLACE(
REGEXP_REPLACE(station,
'http://environment\.data\.gov\.uk/flood-monitoring/id/stations/',
'') AS station
)
FROM measures_stg;
ALTER TABLE measures
ADD CONSTRAINT measures_pk PRIMARY KEY (notation);
CREATE OR REPLACE TABLE stations AS
SELECT * EXCLUDE (measures)
FROM stations_stg;
ALTER TABLE stations
ADD CONSTRAINT stations_pk PRIMARY KEY (notation);
CREATE TABLE IF NOT EXISTS readings AS
SELECT * EXCLUDE "@id" FROM readings_stg WHERE FALSE;
ALTER TABLE readings
ADD CONSTRAINT readings_pk PRIMARY KEY (dateTime, measure);
INSERT OR IGNORE INTO readings
SELECT *
EXCLUDE "@id"
REPLACE(
REGEXP_REPLACE(measure,
'http://environment\.data\.gov\.uk/flood-monitoring/id/measures/',
'') AS measure)
FROM readings_stg;
CREATE OR REPLACE VIEW vw_readings_enriched AS
SELECT "r_\0": COLUMNS(r.*),
"m_\0": COLUMNS(m.*),
"s_\0": COLUMNS(s.*)
FROM
readings r
LEFT JOIN measures m ON r.measure = m.notation
LEFT JOIN stations s ON m.station = s.notation;
CREATE TABLE IF NOT EXISTS readings_enriched AS
SELECT * FROM vw_readings_enriched LIMIT 0;
ALTER TABLE readings_enriched
ADD CONSTRAINT readings_enriched_pk PRIMARY KEY (r_dateTime, r_measure);
INSERT OR IGNORE INTO readings_enriched
SELECT * FROM vw_readings_enriched;
SELECT DATE_TRUNC('day', r_dateTime) AS day, COUNT(*) AS row_ct, MIN(r_dateTime) AS min_dateTime, MAX(r_dateTime) AS max_dateTime FROM readings_enriched GROUP BY 1 ORDER BY 1;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment