This example shows how to create an asset graph where the root assets are run by various schedules, and the downstreams are run using varying auto-materialization policies.
To install:
pip install dagster dagster-webserver
from typing import Any | |
from dagster import ConfigurableResource, ConfigurableIOManager, InputContext, OutputContext, asset, Definitions, ResourceDependency, EnvVar | |
from pydantic import Field | |
# https://docs.dagster.io/concepts/resources#resources-that-depend-on-other-resources | |
class myResource(ConfigurableResource): | |
username: str = Field(description="the username") | |
password: str = Field(description="the password") |
import os | |
from dagster import define_asset_job, load_assets_from_package_module, repository, with_resources, op, job, ScheduleDefinition | |
from my_dagster_project import assets | |
from datadog_api_client import ApiClient, Configuration | |
from datadog_api_client.v2.api.metrics_api import MetricsApi | |
from datadog_api_client.v2.model.metric_intake_type import MetricIntakeType | |
from datadog_api_client.v2.model.metric_payload import MetricPayload | |
from datadog_api_client.v2.model.metric_point import MetricPoint | |
from datetime import datetime |
import datetime | |
import pins | |
import os | |
import seaborn as sns | |
from dagster import asset, asset_check, AssetCheckResult | |
from posit import connect # install as uv pip install posit-sdk | |
from sklearn.linear_model import LogisticRegression | |
from sklearn.model_selection import train_test_split | |
from sklearn.pipeline import Pipeline |
from dagster import asset | |
# add an attribute to all assets using this decorator without users having to adjust it | |
def bi_team_asset(**asset_decorator_kwargs): | |
def _wrapper(f): | |
@asset(**asset_decorator_kwargs, owners=["bi@corp.org"], name=f.__name__) | |
def _impl(**kwargs): | |
return f(**kwargs) | |
return _impl |
[ | |
{ | |
"dag_id": "powerbi1", | |
"python_script": "some_script.py", | |
"dataset_id": "fdsjl4539fdjsk" | |
}, | |
{ | |
"dag_id": "powerbi2", | |
"python_script": "some_other_script.py", | |
"dataset_id": "89fdskfds0" |
This example shows a skeleton for how to build a Dagster project that extracts tables from SQL Server, stores the extract as a CSV in GCS, and then uploads the GCS extract to BigQuery.
The actual extract and load logic is omitted. But the purpose of this project is to show how such a pipeline can be represented in Dagster assets.
First, a single pipeline for one table is created. This is demonstrated in the file dagster_mock_one_table.py
. To run this example:
pip install dagster dagster-webserver
profile: | |
name: FN | |
stocks_to_index: | |
- ticker: NFLX | |
- ticker: META | |
index_strategy: | |
type: equal | |
forecast: | |
days: 60 |
with events as (select distinct | |
DATE_FORMAT(timestamp, '%Y-%m') as event_month, | |
dagster_event_type, | |
coalesce(run_id, '||', step_key) as step_id, | |
count(1) as credits | |
from event_logs | |
where dagster_event_type = 'STEP_START' |
# run with dagster-dev -f observable_source_assets1.py, then enable the auto materialization daemon | |
# the downstreams will update as the live data is observed | |
from dagster import observable_source_asset, asset, AutoMaterializePolicy, DataVersion | |
from datetime import datetime | |
def today(): | |
return datetime.today().date() | |
def now(): |