-
-
Save geoHeil/a7ee774e65b597e9ac534e678cf5ecfc to your computer and use it in GitHub Desktop.
Dagster new DBT API
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
from typing import Optional | |
from dagster import MetadataValue, OpExecutionContext, Output | |
from dagster_dbt.asset_decorator import dbt_assets | |
from dagster_dbt.cli import DbtCli, DbtManifest | |
from dagster_dbt.utils import output_name_fn | |
from dateutil import parser | |
from My_DagsterProject.resources import DBT_MANIFEST_PATH, DBT_PROFILES_DIR | |
manifest = DbtManifest.read(path=DBT_MANIFEST_PATH) | |
@dbt_assets(manifest=manifest, io_manager_key="warehouse_io_manager") | |
def dbt_assets(context: OpExecutionContext, dbt: DbtCli): | |
target = os.environ.get("DBT_TARGET", "local") | |
context.log.debug(f"Running dbt with target {target}") | |
dbt_cli_task = dbt.cli( | |
["run", "--target", target, "--profiles-dir", DBT_PROFILES_DIR], | |
manifest=manifest, | |
context=context, | |
) | |
# Run the task, but don't yield events. | |
events = list(dbt_cli_task.stream_raw_events()) | |
if dbt_cli_task.is_successful: | |
# Get the run results after the task has completed. | |
run_results = dbt_cli_task.get_artifact("run_results.json") | |
executed_manifest = dbt_cli_task.get_artifact("manifest.json") | |
results_by_output_name = { | |
output_name_fn({"unique_id": result["unique_id"]}): result | |
for result in run_results["results"] | |
} | |
manifest_by_output_name = { | |
output_name_fn({"unique_id": unique_id}): node | |
for unique_id, node in executed_manifest["nodes"].items() | |
} | |
# Then, we can use the run results to add metadata to the outputs. | |
for event in events: | |
for dagster_event in event.to_default_asset_events(manifest=manifest): | |
if isinstance(dagster_event, Output): | |
event_node_info = event.event["data"]["node_info"] | |
started_at = parser.isoparse(event_node_info["node_started_at"]) | |
completed_at = parser.isoparse(event_node_info["node_finished_at"]) | |
output_name = dagster_event.output_name | |
result = results_by_output_name[output_name] | |
rows_affected: Optional[int] = result["adapter_response"].get( | |
"rows_affected" | |
) | |
rows_affected_metadata = ( | |
{"rows_affected": rows_affected} if rows_affected else {} | |
) | |
node = manifest_by_output_name[output_name] | |
compiled_sql: Optional[str] = node.get("compiled_code") | |
compiled_sql_metadata = ( | |
{"compiled_sql": MetadataValue.md(compiled_sql)} | |
if compiled_sql | |
else {} | |
) | |
node_metadata = { | |
"Execution Started At": started_at.isoformat( | |
timespec="seconds" | |
), | |
"Execution Completed At": completed_at.isoformat( | |
timespec="seconds" | |
), | |
"Execution Duration": ( | |
completed_at - started_at | |
).total_seconds(), | |
} | |
context.add_output_metadata( | |
metadata={ | |
**rows_affected_metadata, | |
**compiled_sql_metadata, | |
**node_metadata, | |
}, | |
output_name=output_name, | |
) | |
yield dagster_event |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment