Skip to content

Instantly share code, notes, and snippets.

@slopp
Created July 12, 2023 16:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save slopp/368464bec48363c3e70cc4607fb21313 to your computer and use it in GitHub Desktop.
Save slopp/368464bec48363c3e70cc4607fb21313 to your computer and use it in GitHub Desktop.
Ops and Jobs Example

Ops and Jobs Example

This example shows how to use ops to create a graph of tasks executed as a Dagster job with specific configuration and resources. The example also shows how to use a custom schedule.

To start:

pip install dagster dagit
dagster dev -f ops_example.py

The graph of ops:

Screen Shot 2023-07-12 at 10 26 34 AM

The optional configuration for a run:

Screen Shot 2023-07-12 at 10 27 32 AM

The schedule:

Screen Shot 2023-07-12 at 10 27 59 AM

from dagster import op, graph, ConfigurableResource, schedule, EnvVar, Config, RunConfig, multiprocess_executor, OpExecutionContext, In, Nothing, ScheduleEvaluationContext, RunRequest, Definitions
from pydantic import Field
from datetime import datetime
import os
# ops can be arbitrary python code
@op
def start_processing_infrastucture():
""" Warm up processing infra """
...
# code to start processing infra
# ops can also use resources and accept configuration
# resources are used to represent external systems
# they can be configured based on the runtime environment
# or easily mocked for unit tests
class MyStorage(ConfigurableResource):
blob_storage: str = Field("test", description="Bucket to use for customers list")
creds: str
def push_customers(self, customers: dict):
...
# some code to write clients using the container_client
# eg container_client = blob_service_client.get_container_client(container=self.blob_storage)
# return path_to_stored_customers
def read_customers(self, path: str):
...
# op configuration is done through typed pydantic classes
class MyConfig(Config):
batch_size: int
@op
def push_customers_to_storage(config: MyConfig, storage: MyStorage ):
""" Get batch of customers and push to storage"""
# batch_size = config.batch_size
# customers = some_function_to_get_customers(batch_size)
# path_in_storage = storage.push_customers(customers)
# return path_in_storage
...
# ops can depend on the return values of upstreams
# or they can use "nothing" dependencies
@op(
ins={"infra": In(Nothing)}
)
def process_customers(context: OpExecutionContext, storage: MyStorage, customer_path: str):
"""Once customers are staged and infra warm, process them"""
# dagster captures stdout / stderr automatically, but ops can also
# emit structured logs to the dagster event log
context.log.info(f"Running the customers for {datetime.today()}")
# customers = storage.read_customers(customer_path)
# some_function_to_process_customers_on_infra(customers)
...
# graphs combine ops to create the DAG. graphs can be re-used across different jobs, schedules, or nested into larger graphs
# if the graph will only be used once and you want more concise code, you can skip @graph
# and use @job to define a job directly from ops
@graph
def prep_and_run_customers():
"""Warm the infra and get customers in parallel, then process them"""
# many options exist for defining the dag, see https://docs.dagster.io/concepts/ops-jobs-graphs/graphs
process_customers(
infra=start_processing_infrastucture(),
customer_path=push_customers_to_storage()
)
# jobs are a specific instance of a graph with associated config
prep_and_run_customer_batch50 = prep_and_run_customers.to_job(
name="prep_and_run_customer_batch50",
config=RunConfig(ops={"push_customers_to_storage": MyConfig(batch_size=50)}),
executor_def=multiprocess_executor # executors control how the ops are executed within a run
)
# schedules can be totally customized
@schedule(job=prep_and_run_customer_batch50, cron_schedule="0 0 * * *")
def run_except_holidays(context: ScheduleEvaluationContext):
scheduled_date = context.scheduled_execution_time.strftime("%Y-%m-%d")
# if scheduled_date not get_business_holidays():
# return RunRequest()
...
def get_env():
if os.getenv("PROD") is not None:
return "PROD"
return "LOCAL"
defs = Definitions(
jobs=[prep_and_run_customer_batch50],
schedules=[run_except_holidays],
resources={
"storage": MyStorage(
blob_storage=get_env(), # determine resource config based on where the code is running
creds=EnvVar("CREDS") #EnvVar is a light wrapper around os.getenv that makes this config show as a secret in the Dagster resource UI
)
}
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment