Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save alexcombessie/64e5508d87e47685f9b5955cb18a9ee2 to your computer and use it in GitHub Desktop.
Save alexcombessie/64e5508d87e47685f9b5955cb18a9ee2 to your computer and use it in GitHub Desktop.
[Dataiku scenario] Custom trigger to check if all SQL datasets have changed
# LIBRARY LOADING
from dataiku.scenario import Trigger
import dataiku
import pandas as pd
from dataiku.core.sql import SQLExecutor2
import json
# USER INPUT - CHANGE THE DICTIONARY BELOW
## This should be a dictionary of KEY: VALUE
## with KEY being names of datasets you want to monitor
## and VALUE the SQL query you use to check for a change.
## Please change the queries to avoid selecting everything.
dataset_checks_with_query = {
"test_sql_copy":
"""
SELECT * from "TETESTS_test_sql_copy"
"""
,"test_sql_2_copy":
"""
SELECT * from "TETESTS_test_sql_2_copy"
"""
}
# SCRIPT
## Everything should be automatic from now on
## Check the print outputs if needed to debug
t = Trigger()
client = dataiku.api_client()
def get_local_project_variables():
project = client.get_project(dataiku.default_project_key())
project_variables = project.get_variables()
return(project_variables["local"])
def update_local_project_variables(dic):
project = client.get_project(dataiku.default_project_key())
project_variables = project.get_variables()
local_project_variables_new = project_variables["local"].copy()
local_project_variables_new.update(dic)
project_variables["local"] = local_project_variables_new
project.set_variables(project_variables)
return(0, "SUCCESS")
previous_query_outputs = {k: get_local_project_variables().get(k)
for k in dataset_checks_with_query.keys()}
print("+ PYTRIGGER + Previous query outputs are:")
print(previous_query_outputs)
executors = {
k: SQLExecutor2(dataset = dataiku.Dataset(k))
for k,v in dataset_checks_with_query.items()
}
print("+ PYTRIGGER + Executing queries")
new_query_outputs = {
k: executors[k].query_to_df(v).to_json()
for k,v in dataset_checks_with_query.items()
}
print("+ PYTRIGGER + New query outputs are:")
print(new_query_outputs)
query_outputs_comparison = {k: previous_query_outputs[k]!=new_query_outputs[k]
for k in dataset_checks_with_query.keys()}
print("+ PYTRIGGER + Have query outputs changed? Let's find out!")
print(query_outputs_comparison)
query_outputs_changed = [v for k,v in query_outputs_comparison.items()]
if all(query_outputs_changed):
update_local_project_variables(new_query_outputs)
print("+ PYTRIGGER + Local project variables have been updated with new query outputs")
print("+ PYTRIGGER + A.I.M. Fire!")
t.fire()
else:
print("+ PYTRIGGER + Nothing to do")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment