Skip to content

Instantly share code, notes, and snippets.

@stkbailey
Last active June 8, 2022 13:17
Show Gist options
  • Save stkbailey/4bd91c59c9097f210e19a79ad557a046 to your computer and use it in GitHub Desktop.
Save stkbailey/4bd91c59c9097f210e19a79ad557a046 to your computer and use it in GitHub Desktop.
Create a Simplified Dagster Config
#%%
#
from collections import defaultdict
from dagster import op, graph, Field, config_mapping, GraphDefinition
@op(config_schema={"message": str, "other_stuff": Field(int, 9)})
def print_something(context):
print(context.op_config["message"])
@op
def dont_print_something(context):
print("mmhmm")
@graph
def thing_graph():
print_something()
print_something.alias("foo")()
# dont_print_something()
def _create_config_schema_from_graph(g: GraphDefinition) -> dict:
schema = {}
for node in g.solids:
node_name = node.name
if node.definition.has_config_field:
fields = node.definition.get_config_field().config_type.fields
for param, val in fields.items():
# if v.default_value_as_json_str
schema[f"{node_name}.{param}"] = val
return {"inputs": schema}
def _defaultdict():
return defaultdict(_defaultdict)
def generate_simplified_config_mapping(g: GraphDefinition):
custom_schema = _create_config_schema_from_graph(g)
@config_mapping(config_schema=custom_schema)
def inner(val):
config = defaultdict(_defaultdict)
for k, v in val.get("inputs", {}).items():
node_name, node_param = k.split(".")
config["ops"][node_name]["config"][node_param] = val
return config
return inner
simple_config = generate_simplified_config_mapping(thing_graph)
j = thing_graph.to_job(name="thing_job", config=simple_config)
if __name__ == "__main__":
j.execute_in_process(
run_config={
"inputs": {
"foo.message": "bar",
"print_something.message": "baz",
"foo.other_stuff": 9,
}
}
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment