Skip to content

Instantly share code, notes, and snippets.

{{- select_all_filter_by(ref('clean_orders'), 'state', 'TX') -}}
{%- macro select_all_filter_by(table, filter_col, filter_val) %}
SELECT * FROM {{table}} WHERE {{filter_col}} = '{{filter_val}}'
{% endmacro -%}
SELECT
customer_id,
customer_name,
order_datetime,
order_number,
ordered_products
FROM `hive_metastore`.`dbt_databricks`.`raw_orders`
{%- set all_cols = adapter.get_columns_in_relation(this) -%}
{%- set needed_cols = [] -%}
{%- for col in all_cols %}
{%- set colname = col.column -%}
{%- if colname.startswith('customer') or colname.startswith('order') %}
{%- set _ = needed_cols.append(colname) -%}
{% endif -%}
{% endfor -%}
SELECT
{{ needed_cols|join(',\n') }}
version: 2
sources:
- name: dbt_databricks
database: hive_metastore
tables:
- name: raw_orders
- name: customers
select * from {{ ref('clean_orders') }} where state = 'TX'
SELECT r.customer_id, r.customer_name, r.date, r.order_number, r.products,
c.state, c.city
FROM {{ source('dbt_databricks', 'raw_orders') }} r
LEFT JOIN {{ source('dbt_databricks', 'customers') }} c
ON r.customer_id = c.customer_id
AND r.customer_name = c.customer_name
CREATE OR REFRESH STREAMING LIVE TABLE raw_orders AS (SELECT * FROM ...);
CREATE OR REFRESH STREAMING LIVE TABLE customers AS (SELECT * FROM ...);
CREATE OF REFRESH STREAMING LIVE TABLE clean_orders as (
SELECT r.customer_id, r.customer_name, r.date, r.order_number, r.products,
c.state, c.city
FROM STREAM(LIVE.raw_orders) r
LEFT JOIN LIVE.customers c
ON r.customer_id = c.customer_id
@lightle
lightle / airflow_false_catchup.py
Created June 16, 2022 07:31
For the Airflow start_date CI pipeline article
dag = DAG(
dag_id="sample_dag",
schedule_interval="@daily",
start_date=dt.datetime(2022, 01, 01),
catchup=False,
)
@lightle
lightle / airflow_dag_execution_date.py
Created June 7, 2022 05:57
For the article Understanding airflow start_date and execution_date
dag = DAG(
dag_id="sample_dag",
schedule_interval="@daily",
start_date=dt.datetime(2022, 01, 01),
)
def _print_execution_date(ds):
print(f"The execution date of this flow is {ds}")
print_dag = PythonOperator(