Skip to content

Instantly share code, notes, and snippets.

@geoHeil
Created June 14, 2023 14:08
Show Gist options
  • Save geoHeil/a7ee774e65b597e9ac534e678cf5ecfc to your computer and use it in GitHub Desktop.
Save geoHeil/a7ee774e65b597e9ac534e678cf5ecfc to your computer and use it in GitHub Desktop.
Dagster new DBT API
import os
from typing import Optional
from dagster import MetadataValue, OpExecutionContext, Output
from dagster_dbt.asset_decorator import dbt_assets
from dagster_dbt.cli import DbtCli, DbtManifest
from dagster_dbt.utils import output_name_fn
from dateutil import parser
from My_DagsterProject.resources import DBT_MANIFEST_PATH, DBT_PROFILES_DIR
manifest = DbtManifest.read(path=DBT_MANIFEST_PATH)
@dbt_assets(manifest=manifest, io_manager_key="warehouse_io_manager")
def dbt_assets(context: OpExecutionContext, dbt: DbtCli):
target = os.environ.get("DBT_TARGET", "local")
context.log.debug(f"Running dbt with target {target}")
dbt_cli_task = dbt.cli(
["run", "--target", target, "--profiles-dir", DBT_PROFILES_DIR],
manifest=manifest,
context=context,
)
# Run the task, but don't yield events.
events = list(dbt_cli_task.stream_raw_events())
if dbt_cli_task.is_successful:
# Get the run results after the task has completed.
run_results = dbt_cli_task.get_artifact("run_results.json")
executed_manifest = dbt_cli_task.get_artifact("manifest.json")
results_by_output_name = {
output_name_fn({"unique_id": result["unique_id"]}): result
for result in run_results["results"]
}
manifest_by_output_name = {
output_name_fn({"unique_id": unique_id}): node
for unique_id, node in executed_manifest["nodes"].items()
}
# Then, we can use the run results to add metadata to the outputs.
for event in events:
for dagster_event in event.to_default_asset_events(manifest=manifest):
if isinstance(dagster_event, Output):
event_node_info = event.event["data"]["node_info"]
started_at = parser.isoparse(event_node_info["node_started_at"])
completed_at = parser.isoparse(event_node_info["node_finished_at"])
output_name = dagster_event.output_name
result = results_by_output_name[output_name]
rows_affected: Optional[int] = result["adapter_response"].get(
"rows_affected"
)
rows_affected_metadata = (
{"rows_affected": rows_affected} if rows_affected else {}
)
node = manifest_by_output_name[output_name]
compiled_sql: Optional[str] = node.get("compiled_code")
compiled_sql_metadata = (
{"compiled_sql": MetadataValue.md(compiled_sql)}
if compiled_sql
else {}
)
node_metadata = {
"Execution Started At": started_at.isoformat(
timespec="seconds"
),
"Execution Completed At": completed_at.isoformat(
timespec="seconds"
),
"Execution Duration": (
completed_at - started_at
).total_seconds(),
}
context.add_output_metadata(
metadata={
**rows_affected_metadata,
**compiled_sql_metadata,
**node_metadata,
},
output_name=output_name,
)
yield dagster_event
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment