Skip to content

Instantly share code, notes, and snippets.

@lightle
Last active April 15, 2024 07:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lightle/ed2e3ad7f134c09b5766d1eed88812d1 to your computer and use it in GitHub Desktop.
Save lightle/ed2e3ad7f134c09b5766d1eed88812d1 to your computer and use it in GitHub Desktop.
import dlt
source = "<catalog>.<schema>."
@dlt.table
def clean_orders():
raw_orders = spark.read.table(f"{source}raw_orders")
customers = spark.read.table(f"{source}customer_orders")
join_conditions = [
raw_orders.customer_id == customers.customer_id,
raw_orders.customer_name == customers.customer_name,
]
return (
raw_orders.join(customers, on=join_conditions, how='left')
.select(
raw_orders.customer_id,
raw_orders.customer_name,
'order_datetime',
'order_number',
'ordered_product',
'city',
'state',
'lat',
'lon'
)
)
@dlt.table
def customer_orders():
df = dlt.read("clean_orders")
all_cols = df.columns
needed_cols = [col for col in all_cols if col.startswith('customer') or col.startswith('order')]
return df.select(needed_cols)
def select_all_filter_by(table, filter_column, filter_value):
return dlt.read(table).filter(f"{filter_column}='{filter_value}'")
@dlt.table
def new_york_orders():
return select_all_filter_by('clean_orders', 'state', 'NY')
@dlt.table
def texas_orders():
return select_all_filter_by('clean_orders', 'state', 'TX')
@dlt.table
def seattle_orders():
return select_all_filter_by('clean_orders', 'city', 'Seattle')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment