Skip to content

Instantly share code, notes, and snippets.

@fabito
Created March 5, 2024 08:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fabito/6af6d37f8d976e6008730c031e4953e2 to your computer and use it in GitHub Desktop.
Save fabito/6af6d37f8d976e6008730c031e4953e2 to your computer and use it in GitHub Desktop.
RayClusterResource Dagster resource implementation that provision/deprovision a new Ray cluster on demand
from typing import Optional
from dagster import AssetExecutionContext, ConfigurableResource, Definitions, InitResourceContext, asset
from ray.autoscaler.sdk import create_or_update_cluster, run_on_cluster, teardown_cluster
class RayClusterResource(ConfigurableResource):
cluster_config: str
def setup_for_execution(self, context: InitResourceContext) -> None:
# https://docs.ray.io/en/latest/_modules/ray/autoscaler/sdk/sdk.html#create_or_update_cluster
# TODO suffix ray cluster name with run_id
# context.dagster_run
context.log_manager.info("Starting new Ray cluster")
create_or_update_cluster(cluster_config=self.cluster_config)
def teardown_after_execution(self, context: InitResourceContext) -> None:
# https://docs.ray.io/en/latest/_modules/ray/autoscaler/sdk/sdk.html#teardown_cluster
teardown_cluster(cluster_config=self.cluster_config)
def run(self, cmd: str) -> Optional[str]:
# https://docs.ray.io/en/latest/_modules/ray/autoscaler/sdk/sdk.html#run_on_cluster
return run_on_cluster(cluster_config=self.cluster_config, cmd=cmd)
@asset
def webdataset(ray_cluster: RayClusterResource, context: AssetExecutionContext) -> None:
# recorded metadata can be customized
# metadata = {
# "num_records": len(df),
# "preview": MetadataValue.md(df[["title", "by", "url"]].to_markdown()),
# }
# context.add_output_metadata(metadata=metadata)
ray_cluster.run(cmd="""python3.11 -c 'print("Hello World")'""")
defs = Definitions(
assets=[webdataset],
resources={
"ray_cluster": RayClusterResource(cluster_config="/etc/ray/example_ray_config.yml"),
},
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment