Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
library(whisker)
library(bigrquery)
library(condusco)
# Set GBQ project
project <- '<YOUR_GBQ_PROJECT_ID_HERE>'
# Configuration
config <- data.frame(
dataset_id = '<YOUR_GBQ_DATASET_ID_HERE>',
table_prefix = 'tmp_test_partition'
)
# Set the following options for GBQ authentication on a cloud instance
options("httr_oauth_cache" = "~/.httr-oauth")
options(httr_oob_default=TRUE)
# Run the below query to authenticate and write credentials to .httr-oauth file
query_exec("SELECT 'foo' as bar",project=project);
# The pipeline that creates the pivot table
migrating_to_partitioned_step_001_create_pivot <- function(params){
destination_table <- "{{{dataset_id}}}.{{{table_prefix}}}_partitions"
query <- "
SELECT
{{#date_list}}
ARRAY_CONCAT_AGG(CASE WHEN d = 'day{{{yyyymmdd}}}' THEN r END) AS day_{{{yyyymmdd}}},
{{/date_list}}
line
FROM (
SELECT d, r, ROW_NUMBER() OVER(PARTITION BY d) AS line
FROM (
SELECT
stn, CONCAT('day', year, mo, da) AS d, ARRAY_AGG(t) AS r
FROM `bigquery-public-data.noaa_gsod.gsod2017` AS t
GROUP BY stn, d
)
)
GROUP BY line
"
query_exec(whisker.render(query,params),
project=project,
destination_table=whisker.render(destination_table, params),
write_disposition='WRITE_TRUNCATE',
use_legacy_sql = FALSE
);
}
# Run the pipeline that creates the pivot table
# Create a JSON string in the invocation query that looks like [{"yyyymmdd":"20171206"},{"yyyymmdd":"20171205"},...]
invocation_query <- "
SELECT
'{{{dataset_id}}}' as dataset_id,
'{{{table_prefix}}}' as table_prefix,
CONCAT(
'[',
STRING_AGG(
CONCAT('{\"yyyymmdd\":\"',FORMAT_DATE('%Y%m%d',partition_date),'\"}')
),
']'
) as date_list
FROM (
SELECT
DATE_ADD(DATE(CURRENT_DATETIME()), INTERVAL -n DAY) as partition_date
FROM (
SELECT [1,2,3] as n
),
UNNEST(n) AS n
)
"
run_pipeline_gbq(
migrating_to_partitioned_step_001_create_pivot,
whisker.render(invocation_query,config),
project,
use_legacy_sql = FALSE
)
# The pipeline that creates the individual partitions
migrating_to_partitioned_step_002_unnest <- function(params){
destination_table <- "{{{dataset_id}}}.{{{table_prefix}}}_{{{day_partition_date}}}"
query <- "
SELECT r.*
FROM {{{dataset_id}}}.{{{table_prefix}}}_partitions, UNNEST({{{day_partition_date}}}) as r
"
query_exec(whisker.render(query,params),
project=project,
destination_table=whisker.render(destination_table, params),
write_disposition='WRITE_TRUNCATE',
use_legacy_sql = FALSE
);
}
invocation_query <- "
SELECT
'{{{dataset_id}}}' as dataset_id,
'{{{table_prefix}}}' as table_prefix,
CONCAT('day_',FORMAT_DATE('%Y%m%d',partition_date)) as day_partition_date
FROM (
SELECT
DATE_ADD(DATE(CURRENT_DATETIME()), INTERVAL -n DAY) as partition_date
FROM (
SELECT [1,2,3] as n
),
UNNEST(n) AS n
)
"
run_pipeline_gbq(
migrating_to_partitioned_step_002_unnest,
whisker.render(invocation_query,config),
project,
use_legacy_sql = FALSE
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment