Skip to content

Instantly share code, notes, and snippets.

@gabcoyne
Last active August 13, 2021 00:09
Show Gist options
  • Save gabcoyne/9fe4f029efffc7cb1f59616479c76351 to your computer and use it in GitHub Desktop.
Save gabcoyne/9fe4f029efffc7cb1f59616479c76351 to your computer and use it in GitHub Desktop.
from prefect import task
from prefect.client import Secret
import pygit2
@task(name="Clone DBT")
def pull_dbt_repo():
logger = prefect.context.get("logger")
shutil.rmtree("dbt", ignore_errors=True) # Delete folder on run
git_token = Secret("GITHUB_ACCESS_TOKEN").get()
dbt_repo_name = "slate-dbt"
dbt_repo = (
f"https://{git_token}:x-oauth-basic@github.com/slate-data/{dbt_repo_name}"
)
pygit2.clone_repository(dbt_repo, "dbt")
from prefect import task
from prefect.client import Secret
from prefect.tasks.dbt.dbt import DbtShellTask
dbt = DbtShellTask(
return_all=True,
profile_name="snowflake_slate",
environment="dev",
# profiles_dir=".",
overwrite_profiles=True,
log_stdout=True,
helper_script="cd dbt",
log_stderr=True,
dbt_kwargs={
"type": "snowflake",
"account": Secret("SNOWFLAKE_ACCOUNT").get(),
# User/password auth
"user": Secret("DBT__SNOWFLAKE_USER").get(),
"password": Secret("DBT__SNOWFLAKE_PASS").get(),
"role": Secret("DBT__SNOWFLAKE_ROLE").get(),
"database": Secret("DBT__SNOWFLAKE_DATABASE").get(),
"warehouse": Secret("DBT__SNOWFLAKE__WAREHOUSE").get(),
"schema": Secret("DBT__SCHEMA").get(),
"threads": 12,
"client_session_keep_alive": False,
},
)
@task(trigger=all_finished)
def output_print(output):
logger = prefect.context.get("logger")
for o in output:
logger.info(o)
from prefect import Flow
with Flow(
"DBT Daily",
) as f:
pull_repo = pull_dbt_repo()
deps = dbt(
command="dbt deps",
task_args={"name": "DBT: Dependencies"},
upstream_tasks=[pull_repo],
)
deps_output = output_print(
deps,
task_args={"name": "DBT: Dependency Output"},
)
run_daily = dbt(
command="dbt run -m tag:daily",
task_args={"name": "DBT: Run Dailies"},
upstream_tasks=[pull_repo, deps_output],
)
output_print(run_daily, task_args={"name": "DBT Daily Output"})
test = dbt(
command="dbt test -m tag:daily",
task_args={"name": "DBT: Test Dailies"},
upstream_tasks=[pull_repo, run_daily],
)
output_print(test)
f.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment