Skip to content

Instantly share code, notes, and snippets.

@sanealytics
Last active June 20, 2020 19:11
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sanealytics/39884f087e2046e88c8ffce19a2ef1ce to your computer and use it in GitHub Desktop.
Save sanealytics/39884f087e2046e88c8ffce19a2ef1ce to your computer and use it in GitHub Desktop.
def get_elt_queries():
"""Gets parameterised queries for ELT"""
return {
# Parameters: service
'input_data': """
select
data,
added_ts
from
XXX.TABLE
where
service = @service
order by
added_ts desc
""",
# Parameters: tid
'output_merge': """
merge into target t
using (
select *,
@tid as tid,
timestamp_utc as last_updated_ts,
tid as last_tid
from input_table
) s
on t.some_id = s.some_id
when matched then
update set
t.last_updated_ts = s.last_updated_ts,
t.last_tid = s.last_tid
when not matched then
insert row;
"""
}
# Deploy a wrapper around this function to wake up on file drop of some file
def sample_transform_table():
input_data = get_input_df('input_data', 'some_service') # Returns pandas dataframe
r = service_trace_id('sample_transform_table', ...., input_data['tid'].unique().tolist())
tid = json.loads(r).get('tid')
# When writing data back,
# For new rows, insert this new tid into into tid column.
# For updating rows, insert this new tid into prev_tid column.
# Check out merge example to do all in one step (when possible)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment