Skip to content

Instantly share code, notes, and snippets.

@helloworld
Last active July 1, 2020 12:37
Show Gist options
  • Save helloworld/f4d4e898f94528315f49be64b7887305 to your computer and use it in GitHub Desktop.
Save helloworld/f4d4e898f94528315f49be64b7887305 to your computer and use it in GitHub Desktop.
from dagster import pipeline, solid, RepositoryDefinition, InputDefinition, execute_pipeline_iterator, DagsterInstance
@solid(input_defs=[InputDefinition("number", int)])
def my_solid(context, number):
context.log.info("Number: {}".format(number))
# Stop condition to prevent infinite recursion
if number == 5:
return
for event in execute_pipeline_iterator(my_pipeline, instance=DagsterInstance.get(), environment_dict={
'solids': {
'my_solid': {
'inputs': {
'number': number + 1
}
}
}
}):
context.log.info(str(event)) # or simply call pass
@pipeline
def my_pipeline():
my_solid()
def define_repository():
return RepositoryDefinition("my_repository", pipeline_defs=[my_pipeline])
@helloworld
Copy link
Author

Alternatively, using execute_pipeline:

from dagster import pipeline, solid, RepositoryDefinition, InputDefinition, execute_pipeline, DagsterInstance


@solid(input_defs=[InputDefinition("number", int)])
def my_solid(context, number):
    context.log.info("Number: {}".format(number))

    # Stop condition to prevent infinite recursion
    if number == 5:
        return

    execute_pipeline(my_pipeline, instance=DagsterInstance.get(), environment_dict={
        'solids': {
            'my_solid': {
                'inputs': {
                    'number': number + 1
                }
            }
        }
    })


@pipeline
def my_pipeline():
    my_solid()


def define_repository():
    return RepositoryDefinition("my_repository", pipeline_defs=[my_pipeline])

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment