Skip to content

Instantly share code, notes, and snippets.

@jaketf
Last active February 18, 2021 18:41
Show Gist options
  • Save jaketf/90b562367edb52bd9d469c44918997d2 to your computer and use it in GitHub Desktop.
Save jaketf/90b562367edb52bd9d469c44918997d2 to your computer and use it in GitHub Desktop.
BQ DML to split DML statement to modify < 4k partitions
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-- Description:
-- Split a DML statement into multiple statements that each modify fewer
-- partitions (10 years of partitions per statement)
-- This procedure should ONLY be used in scenarios where we expect to modify
-- 10s of years of daily partitions.
-- In other cases it will introduce unnecessary performance overhead.
--
-- THIS STORED PROCEDURE ASSUMES:
-- 1. The query job calling this procedure defines any referenced temporary external tables (e.g. temp_ext).
-- https://cloud.google.com/bigquery/external-data-cloud-storage#temporary-tables
-- 2. sql_string references src_table_ref (or temp_ext if src_table_ref = NULL).
-- (this will be replaced dynamically to target a specific 10 year interval)
-- 3. Target table is partitioned.
-- 4. Partition column is required.
--
-- BEHAVIOR NOTES:
-- 1. This procedure will run several children jobs that will have
-- num_dml_rows_affected = 0
--
-- Example Usage:
-- CALL stored_proc_dataset.split_dml("my_dataset", "lineitem", """
-- -- This is a SQL query from transpilation
-- INSERT my_datset.lineitem
-- SELECT
--   *
-- FROM
--   temp_ext
-- """, NULL);
CREATE OR REPLACE PROCEDURE jake_test.split_dml(
    dest_dataset_id STRING,
    dest_table_id STRING,
    sql_string STRING,
src_table_ref STRING
)
BEGIN
DECLARE source_table_ref DEFAULT COALESCE(src_table_ref, "temp_ext");
DECLARE source_table_id DEFAULT ARRAY_REVERSE(SPLIT(source_table_ref, "."))[SAFE_OFFSET(0)];
  DECLARE years_per_part STRING DEFAULT "10";
DECLARE date_fmt STRING DEFAULT "%Y-%m-%d";
  DECLARE i INT64 DEFAULT 0;
  DECLARE partition_col, partition_data_type, cluster_by, temp_table_ref, part_ref, sub_select STRING;
  DECLARE partition_sets ARRAY<DATE>;
DECLARE part_from, part_to DATE;
  -- Determines partition column on target table (e.g. query info schema)
  EXECUTE IMMEDIATE
      FORMAT("""
      SELECT
        COLUMN_NAME,
DATA_TYPE
      FROM
        `%s`.INFORMATION_SCHEMA.COLUMNS
      WHERE
        TABLE_NAME = '%s'
        AND IS_PARTITIONING_COLUMN = 'YES';
      """,
      dest_dataset_id,
dest_table_id) INTO partition_col, partition_data_type;
IF partition_col IS NULL THEN
RAISE USING MESSAGE = FORMAT(
"could not determine partition column for target table %s.%s",
dest_dataset_id,
dest_table_id);
END IF;
  -- split into sets of years_per_part years of daily partitions
  EXECUTE IMMEDIATE
    FORMAT(
"""
        SELECT
          GENERATE_DATE_ARRAY(
MIN(%s),
-- guarantee the last element will be after the max date
DATE_ADD(MAX(%s), INTERVAL %s YEAR),
INTERVAL %s YEAR
)
        FROM
          %s
      """,
      partition_col,
      partition_col,
      years_per_part,
      years_per_part,
      source_table_ref
    ) INTO partition_sets;
-- Optimization: instead of full table scan of source table many times
-- create a BQ native storage temp table and use filter on clustering column
-- in subsequent queries.
SET cluster_by = FORMAT("CLUSTER BY %s", partition_col);
SET temp_table_ref = FORMAT("temp_%s", REGEXP_REPLACE(source_table_ref, r'[.:]', "_"));
    EXECUTE IMMEDIATE FORMAT(
      """
CREATE OR REPLACE TEMP TABLE %s
%s
AS (
SELECT
*
FROM %s
)
""",
temp_table_ref,
cluster_by,
      source_table_ref
    );
  -- Loops over partition sets
  LOOP
    SET i = i + 1;
    IF i >= ARRAY_LENGTH(partition_sets) THEN
-- avoid index out of bounds issue below
-- may seem we are exiting early but last array element is known to be
-- after the max date.
      LEAVE;
    END IF;
SET part_from = partition_sets[ORDINAL(i)];
SET part_to = partition_sets[ORDINAL(i+1)];
SET part_ref = FORMAT(
"%s_from_%s_to_%s",
REGEXP_REPLACE(source_table_ref, r'[.:]', "_"),
FORMAT_DATE('%Y%m%d', part_from),
FORMAT_DATE('%Y%m%d', part_to));
-- Optimization Note: Use subselect so that partition predicate can be
-- pushed down to the target table to avoid unecessary scanning.
    SET sub_select = FORMAT(
      """
(
SELECT
*
FROM %s
-- prune to target only 10 years partitions
WHERE
CAST(%s AS DATE) >= '%s'
AND CAST(%s AS DATE) < '%s'
)
""",
      temp_table_ref,
      partition_col,
      FORMAT_DATE(date_fmt, part_from),
      partition_col,
      FORMAT_DATE(date_fmt, part_to)
    );
    EXECUTE IMMEDIATE
      FORMAT(
"""
-------------------------------------------------------
        -- NOTE: DML WRAPPED WITH split_dml STORED PROCEDURE --
-------------------------------------------------------
-- FOR PART: %s --
        -- ORIGINAL SQL QUERY (rendered with s/%s/%s/g): --
-------------------------------------------------------
        %s
      """,
part_ref,
source_table_ref,
part_ref,
REGEXP_REPLACE(
sql_string,
FORMAT(r'\b%s\b', source_table_ref),
sub_select
)
);
  END LOOP;
END; -- end stored procedure
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment