Skip to content

Instantly share code, notes, and snippets.

@schrockn
Created January 12, 2019 18:22
Show Gist options
  • Save schrockn/726f27987317f51473c81e82ef5cb58f to your computer and use it in GitHub Desktop.
Save schrockn/726f27987317f51473c81e82ef5cb58f to your computer and use it in GitHub Desktop.

This guide is a step-by-step guide for upgrading from dagster 0.2.x to 0.3.0. This represents a substantial upgrade in capabilities but also some breaking API changes. We'll detail them, provide context and reasoning, and instructions about how to upgrade.

Required API Changes

  1. No more top level config subpackage.

Error:

from dagster import (
ImportError: cannot import name 'config'

We have eliminated the public-facing "config" namespace. (You use raw dictionaries instead of a parallel, typed API to configure pipeline runs).

Fix: Simply eliminate the include. You'll run into related errors later.

  1. No more dagster.sqlalchemy and dagster.pandas submodules.

Error:

E   ModuleNotFoundError: No module named 'dagster.sqlalchemy'

We have moved pandas and sqlalchemy code into their own separate modules (dagster-pandas and dagster-sqlalchemy). This makes the core dagster library have less dependencies.

Fix: Instead of importing dagster.sqlalchemy you need to pip install dagster-sqlalchemy, add it to your virtual env, and then include dagster_sqlalchemy instead.

  1. ConfigDefinition no longer exists.

Error:

ImportError: cannot import name 'ConfigDefinition'

We have eliminated a separate notion of a ConfigDefinition. Instead, we realized the the user provided config in a solid, resource, or context is just a Field that you would use to build a dictionary. So replace ConfigDefinition with Field. (Generally config_def=ConfigDefinition is now config_field=Field)

Before:

"production": PipelineContextDefinition(
    context_fn=generate_production_execution_context,
    config_def=ConfigDefinition(
        # ...
    )

After:

"production": PipelineContextDefinition(
    context_fn=generate_production_execution_context,
    config_field=Field(
       # ...
    )

  1. New, Simpler Dagster Type Definition API.

Error:

    description='''This represents a path to a file on disk'''
E   TypeError: __init__() got multiple values for argument 'python_type'

Another Error:

E   dagster.check.ParameterCheckError: Param "klass" was supposed to be a type. Got <dagster.core.types.runtime.PythonObjectType object at 0x11e4fbf60> instead of type <class 'dagster.core.types.runtime.PythonObjectType'>

There are now two different type creation APIs. One for creating new types, and one for annotating existing types that you include.

Examples:

@dagster_type(description='This represents a path to a file on disk')
class PathToFile(str):
    pass

S3FileHandle = as_dagster_type(
    namedtuple('S3FileHandle', 'bucket path'),
    description='''
upload_header_to_s3 and upload_service_line_to_s3 both result in files
being uploaded to s3. Hence the "output" of those two solids is a handle
to a file. The following stages take those as their inputs to create
redshift tables out of them.

Properties:
    - bucket: String
    - path: String
        '''
)

Note you can use S3FileHandle and PathToFile as if they were just "normal types" as well.

  1. ConfigDictionary --> Dict

We have a much less verbose API for building configuration schema:

Error:

E   AttributeError: module 'dagster.core.types' has no attribute 'ConfigDictionary

First, we can discouraging the use of the types namespace. Instead just from dagster import Dict. Second, ConfigDictionary is now just Dict. Third, you do not have to name it. The net result is much nicer:

Before:

types.ConfigDictionary(
    'DefaultContextConfig',
    {
        'data_source_run_id' : Field(types.String, description='''
            This is a run id generated by the caller of this pipeline. Right
            now this is required to tie a single run id to multiple executions
            of the same pipeline.
        '''),
        'conf' : Field(types.Any),
    },
)

After:

Dict({
    'data_source_run_id' : Field(String, description='''
        This is a run id generated by the caller of this pipeline. Right
        now this is required to tie a single run id to multiple executions
        of the same pipeline.
    '''),
    'conf' : Field(Any),
})

This is a fairly mechanical transition.

  1. define_stub_solid no longer in top-level dagster

This is now an internal utility function. If you really, really need it:

from dagster.core.utility_solids import define_stub_solid

  1. Environments are raw dictionaries rather that config.* classes

Per update 1 config classes no longer are public or used in execute_pipeline or similar. Use raw dictionaries instead. They should be shaped exactly like the yaml files.

Before:

    environment = config.Environment(
        context=config.Context(
            name='unittest',
            config={
                'data_source_run_id': str(uuid.uuid4()),
                'conf': CONF,
                'log_level': 'ERROR',
                'cleanup_files': False,
            }
        ),
        solids={
            'unzip_file': config.Solid({
                'zipped_file': ZIP_FILE_PATH
            }),
        },
    )

After:

    environment = { 
        'context':{
            'unittest' : {
                'config' : {
                    'data_source_run_id': str(uuid.uuid4()),
                    'conf': CONF,
                    'log_level': 'ERROR',
                    'cleanup_files': False,
                }
            }
        },
        'solids': {
            'unzip_file': {
                'config' : {
                    'zipped_file': ZIP_FILE_PATH,
                }
            }
        }
    }

While providing less guarantees within the python type system, this API results in very high quality error checking and messaging from the dagster config schema.

  1. New testing APIs

Error:

 AttributeError: type object 'PipelineDefinition' has no attribute 'create_sub_pipeline'

or

AttributeError: type object 'PipelineDefinition' has no attribute 'create_single_solid_pipeline'

The creation of "sub" and "single_solid" pipelines was awkward and error-prone. Instead we have the new functions execute_solid and execute_solids.

Before:


    pipeline = PipelineDefinition.create_single_solid_pipeline(
        define_allscripts_fileload_pipeline(),
        'unzip_file',
    )

    result = execute_pipeline(pipeline, environment)

    assert result.success
    assert os.path.exists(
        result.result_for_solid('unzip_file').transformed_value())

After:

    solid_result = execute_solid(
        define_allscripts_fileload_pipeline(),
        'unzip_file',
        environment=environment
    )

    assert solid_result.success
    assert os.path.exists(solid_result.transformed_value())

Before (with stubbed inputs):

    pipeline = PipelineDefinition.create_single_solid_pipeline(
        define_allscripts_fileload_pipeline(),
        'split_headers_and_service_lines',
        {
            'split_headers_and_service_lines': {
                'unzipped_file':
                define_stub_solid('unzipped_path_value', unzipped_path)
            }
        },
    )

    result = execute_pipeline(pipeline, environment)

    assert result.success
    solid_result = result.result_for_solid('split_headers_and_service_lines')
    assert os.path.exists(solid_result.transformed_value('header_file'))
    assert os.path.exists(solid_result.transformed_value('service_lines_file'))

After (with stubbed inputs):

    solid_result = execute_solid(
        define_allscripts_fileload_pipeline(),
        'split_headers_and_service_lines',
        inputs={
            'unzipped_file': unzipped_path,
        },
        environment=environment,
    )

    assert os.path.exists(solid_result.transformed_value('header_file'))
    assert os.path.exists(solid_result.transformed_value('service_lines_file'))

Before (subset execution):

    pipeline = PipelineDefinition.create_sub_pipeline(
        define_allscripts_fileload_pipeline(),
        ['unzip_file'],
        ['split_headers_and_service_lines'],
        {},
    )

    result = execute_pipeline(pipeline, environment)


    assert result.success
    solid_result = result.result_for_solid('split_headers_and_service_lines')
    snapshot_check_results(snapshot, solid_result)

After (subset execution):

    result_dict = execute_solids(
        define_allscripts_fileload_pipeline(),
        ['unzip_file', 'split_headers_and_service_lines'],
        environment=environment,
    )

    snapshot_check_results(snapshot, result_dict['split_headers_and_service_lines'])
  1. Execution Context Lifecycle Changes

Error:

AttributeError: 'ExecutionContext' object has no attribute 'value'

This is officially the most difficult change, conceptually. We changed the system so that the ExecutionContext passed around to your solids (now RuntimeExecutionContext) is constructed by the system rather than the user. The ExecutionContext object the user creates can be thought of as RuntimeExecutionContextParams, although that was excessively verbose.

Before:

    with context.value('data_source_run_id', data_source_run_id),\
        context.value('data_source', 'allscripts'),\
        context.value('pipeline_run_id', pipeline_run_id):

        yield ExecutionContext(
            loggers=[define_colored_console_logger('dagster', log_level)],
            resources=resources
        )

After:

    yield ExecutionContext(
        loggers=[define_colored_console_logger('dagster', log_level)],
        resources=resources,
        context_stack={
            'data_source_run_id': data_source_run_id,
            'data_source': 'allscripts',
            'pipeline_run_id': pipeline_run_id,
        },
    )
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment