Skip to content

Instantly share code, notes, and snippets.

@slopp
Created January 23, 2024 16:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save slopp/9cc7b6462b7aa958eb0687e0ef7c94c4 to your computer and use it in GitHub Desktop.
Save slopp/9cc7b6462b7aa958eb0687e0ef7c94c4 to your computer and use it in GitHub Desktop.
Dagster with a custom DSL
profile:
name: FN
stocks_to_index:
- ticker: NFLX
- ticker: META
index_strategy:
type: equal
forecast:
days: 60
profile:
name: MAG
stocks_to_index:
- ticker: MSFT
- ticker: AAPL
- ticker: GOOG
index_strategy:
type: weighted_average
forecast:
days: 30
import os
import shutil
from typing import Any, Dict, List, NamedTuple
import yaml
from dagster._core.execution.context.compute import AssetExecutionContext
try:
from yaml import CLoader as Loader
except ImportError:
from yaml import Loader
from dagster import AssetKey, AssetsDefinition, asset, file_relative_path, multi_asset, Definitions
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.pipes.subprocess import PipesSubprocessClient
def load_yaml(relative_path: str) -> Dict[str, Any]:
path = os.path.join(os.path.dirname(__file__), relative_path)
with open(path, "r", encoding="utf8") as ff:
return yaml.load(ff, Loader=Loader)
def get_ticker_data(ticker: str) -> str:
# imagine instead of returning a string, this function fetches data from an external service
return f"{ticker}-data"
def enrich_and_insert_data(ticker_data) -> None:
# imagine this modifies the data and inserts it into our database
pass
def fetch_data_for_ticker(ticker: str) -> str:
# imagine this fetches data from our database
return f"{ticker}-data-enriched"
class StockInfo(NamedTuple):
ticker: str
class IndexStrategy(NamedTuple):
type: str
class Forecast(NamedTuple):
days: int
class Profile(NamedTuple):
name: str
class StockAssets(NamedTuple):
stock_infos: List[StockInfo]
index_strategy: IndexStrategy
forecast: Forecast
profile: Profile
def build_stock_assets_object(stocks_dsl_document: Dict[str, Dict]) -> StockAssets:
return StockAssets(
stock_infos=[
StockInfo(ticker=stock_block["ticker"])
for stock_block in stocks_dsl_document["stocks_to_index"]
],
index_strategy=IndexStrategy(type=stocks_dsl_document["index_strategy"]["type"]),
forecast=Forecast(int(stocks_dsl_document["forecast"]["days"])),
profile=Profile(stocks_dsl_document["profile"]["name"])
)
def get_stocks_dsl_example_defs(yaml_path: str) -> List[AssetsDefinition]:
stocks_dsl_document = load_yaml(yaml_path)
stock_assets = build_stock_assets_object(stocks_dsl_document)
return assets_defs_from_stock_assets(stock_assets)
def assets_defs_from_stock_assets(stock_assets: StockAssets) -> List[AssetsDefinition]:
group_name = stock_assets.profile.name
def spec_for_stock_info(stock_info: StockInfo) -> AssetSpec:
ticker = stock_info.ticker
return AssetSpec(
key=AssetKey([ticker]),
group_name=group_name,
description=f"Fetch {ticker} from internal service",
)
tickers = [stock_info.ticker for stock_info in stock_assets.stock_infos]
ticker_specs = [spec_for_stock_info(stock_info) for stock_info in stock_assets.stock_infos]
forecast_days = stock_assets.forecast.days
index_strategy_type = stock_assets.index_strategy.type
@multi_asset(
specs=ticker_specs,
name=f"{group_name}_fetch_the_tickers",
)
def fetch_the_tickers(context: AssetExecutionContext):
# fetch the raw data
context.log.info(f"Got tickers: {tickers}")
@asset(
deps=fetch_the_tickers.keys,
group_name=group_name,
key_prefix=f"{group_name}"
)
def index_strategy(context: AssetExecutionContext) -> None:
stored_ticker_data = {}
for ticker in tickers:
stored_ticker_data[ticker] = fetch_data_for_ticker(ticker)
# do someting with stored_ticker_data
context.add_output_metadata({
"index strategy": index_strategy_type
})
@asset(
deps=index_strategy.key,
group_name=group_name,
key_prefix=f"{group_name}"
)
def forecast(context: AssetExecutionContext) -> None:
# do some forecast thing
context.add_output_metadata({
"forecast days": forecast_days
})
return [fetch_the_tickers, index_strategy, forecast]
defs = Definitions(
assets = [
*get_stocks_dsl_example_defs("mag_profile.yaml"),
*get_stocks_dsl_example_defs("fn_profile.yaml")
]
)
@slopp
Copy link
Author

slopp commented Jan 23, 2024

Screen Shot 2024-01-23 at 9 00 54 AM

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment