Skip to content

Instantly share code, notes, and snippets.

@stkbailey
Created March 29, 2022 11:20
Show Gist options
  • Save stkbailey/cbc46397a161aaefa302518b68b9eacb to your computer and use it in GitHub Desktop.
Save stkbailey/cbc46397a161aaefa302518b68b9eacb to your computer and use it in GitHub Desktop.
Dagster Dyanmic Output
# https://docs.dagster.io/_apidocs/dynamic
from dagster import op, job, DynamicOut, DynamicOutput, Field, repository
from random import randrange
@op(config_schema={"directory": Field(str, default_value="./sample")}, out=DynamicOut(str))
def return_list_of_files(context) -> dict:
dir = context.op_config["directory"]
for ii in range(randrange(10)):
yield DynamicOutput(dir + str(ii), mapping_key=str(ii))
@op
def do_something_with_files(context, filename):
context.log.info(filename)
return filename
@op
def summarize(context, filename_list):
context.log.info(filename_list)
return filename_list
@job
def do_many_things():
files = return_list_of_files()
file_results = files.map(do_something_with_files)
agged = summarize(file_results.collect())
@repository
def test_repos():
return [do_many_things]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment