|
from dagster import op, graph, ConfigurableResource, schedule, EnvVar, Config, RunConfig, multiprocess_executor, OpExecutionContext, In, Nothing, ScheduleEvaluationContext, RunRequest, Definitions |
|
from pydantic import Field |
|
from datetime import datetime |
|
import os |
|
|
|
# ops can be arbitrary python code |
|
@op |
|
def start_processing_infrastucture(): |
|
""" Warm up processing infra """ |
|
... |
|
# code to start processing infra |
|
|
|
# ops can also use resources and accept configuration |
|
|
|
# resources are used to represent external systems |
|
# they can be configured based on the runtime environment |
|
# or easily mocked for unit tests |
|
|
|
class MyStorage(ConfigurableResource): |
|
blob_storage: str = Field("test", description="Bucket to use for customers list") |
|
creds: str |
|
|
|
def push_customers(self, customers: dict): |
|
... |
|
# some code to write clients using the container_client |
|
# eg container_client = blob_service_client.get_container_client(container=self.blob_storage) |
|
# return path_to_stored_customers |
|
|
|
def read_customers(self, path: str): |
|
... |
|
|
|
# op configuration is done through typed pydantic classes |
|
class MyConfig(Config): |
|
batch_size: int |
|
|
|
|
|
@op |
|
def push_customers_to_storage(config: MyConfig, storage: MyStorage ): |
|
""" Get batch of customers and push to storage""" |
|
# batch_size = config.batch_size |
|
# customers = some_function_to_get_customers(batch_size) |
|
# path_in_storage = storage.push_customers(customers) |
|
# return path_in_storage |
|
... |
|
|
|
# ops can depend on the return values of upstreams |
|
# or they can use "nothing" dependencies |
|
@op( |
|
ins={"infra": In(Nothing)} |
|
) |
|
def process_customers(context: OpExecutionContext, storage: MyStorage, customer_path: str): |
|
"""Once customers are staged and infra warm, process them""" |
|
# dagster captures stdout / stderr automatically, but ops can also |
|
# emit structured logs to the dagster event log |
|
context.log.info(f"Running the customers for {datetime.today()}") |
|
|
|
# customers = storage.read_customers(customer_path) |
|
# some_function_to_process_customers_on_infra(customers) |
|
... |
|
|
|
|
|
# graphs combine ops to create the DAG. graphs can be re-used across different jobs, schedules, or nested into larger graphs |
|
# if the graph will only be used once and you want more concise code, you can skip @graph |
|
# and use @job to define a job directly from ops |
|
|
|
@graph |
|
def prep_and_run_customers(): |
|
"""Warm the infra and get customers in parallel, then process them""" |
|
# many options exist for defining the dag, see https://docs.dagster.io/concepts/ops-jobs-graphs/graphs |
|
process_customers( |
|
infra=start_processing_infrastucture(), |
|
customer_path=push_customers_to_storage() |
|
) |
|
|
|
|
|
# jobs are a specific instance of a graph with associated config |
|
prep_and_run_customer_batch50 = prep_and_run_customers.to_job( |
|
name="prep_and_run_customer_batch50", |
|
config=RunConfig(ops={"push_customers_to_storage": MyConfig(batch_size=50)}), |
|
executor_def=multiprocess_executor # executors control how the ops are executed within a run |
|
) |
|
|
|
# schedules can be totally customized |
|
@schedule(job=prep_and_run_customer_batch50, cron_schedule="0 0 * * *") |
|
def run_except_holidays(context: ScheduleEvaluationContext): |
|
scheduled_date = context.scheduled_execution_time.strftime("%Y-%m-%d") |
|
# if scheduled_date not get_business_holidays(): |
|
# return RunRequest() |
|
... |
|
|
|
def get_env(): |
|
if os.getenv("PROD") is not None: |
|
return "PROD" |
|
return "LOCAL" |
|
|
|
defs = Definitions( |
|
jobs=[prep_and_run_customer_batch50], |
|
schedules=[run_except_holidays], |
|
resources={ |
|
"storage": MyStorage( |
|
blob_storage=get_env(), # determine resource config based on where the code is running |
|
creds=EnvVar("CREDS") #EnvVar is a light wrapper around os.getenv that makes this config show as a secret in the Dagster resource UI |
|
) |
|
} |
|
) |