Last active
June 8, 2022 13:17
-
-
Save stkbailey/4bd91c59c9097f210e19a79ad557a046 to your computer and use it in GitHub Desktop.
Create a Simplified Dagster Config
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#%% | |
# | |
from collections import defaultdict | |
from dagster import op, graph, Field, config_mapping, GraphDefinition | |
@op(config_schema={"message": str, "other_stuff": Field(int, 9)}) | |
def print_something(context): | |
print(context.op_config["message"]) | |
@op | |
def dont_print_something(context): | |
print("mmhmm") | |
@graph | |
def thing_graph(): | |
print_something() | |
print_something.alias("foo")() | |
# dont_print_something() | |
def _create_config_schema_from_graph(g: GraphDefinition) -> dict: | |
schema = {} | |
for node in g.solids: | |
node_name = node.name | |
if node.definition.has_config_field: | |
fields = node.definition.get_config_field().config_type.fields | |
for param, val in fields.items(): | |
# if v.default_value_as_json_str | |
schema[f"{node_name}.{param}"] = val | |
return {"inputs": schema} | |
def _defaultdict(): | |
return defaultdict(_defaultdict) | |
def generate_simplified_config_mapping(g: GraphDefinition): | |
custom_schema = _create_config_schema_from_graph(g) | |
@config_mapping(config_schema=custom_schema) | |
def inner(val): | |
config = defaultdict(_defaultdict) | |
for k, v in val.get("inputs", {}).items(): | |
node_name, node_param = k.split(".") | |
config["ops"][node_name]["config"][node_param] = val | |
return config | |
return inner | |
simple_config = generate_simplified_config_mapping(thing_graph) | |
j = thing_graph.to_job(name="thing_job", config=simple_config) | |
if __name__ == "__main__": | |
j.execute_in_process( | |
run_config={ | |
"inputs": { | |
"foo.message": "bar", | |
"print_something.message": "baz", | |
"foo.other_stuff": 9, | |
} | |
} | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment