Skip to content

Instantly share code, notes, and snippets.

@schrockn
Last active January 17, 2019 21:05
Show Gist options
  • Save schrockn/c16fb4436899ed3d3af9dec67726370b to your computer and use it in GitHub Desktop.
Save schrockn/c16fb4436899ed3d3af9dec67726370b to your computer and use it in GitHub Desktop.
0.2.8 --> 0.3.0 Upgrade Guide

Upgrading to 0.3.0

This guide is a step-by-step guide for upgrading from dagster 0.2.x to 0.3.0. This represents a substantial upgrade in capabilities but also some breaking API changes. We'll detail them, provide context and reasoning, and instructions about how to upgrade.

Required API Changes

  1. No more top level config subpackage.

Error:

from dagster import (
ImportError: cannot import name 'config'

We have eliminated the public-facing "config" namespace. (You use raw dictionaries instead of a parallel, typed API to configure pipeline runs).

Fix: Simply eliminate the include. You'll run into related errors later.

  1. No more dagster.sqlalchemy and dagster.pandas submodules.

Error:

E   ModuleNotFoundError: No module named 'dagster.sqlalchemy'

We have moved pandas and sqlalchemy code into their own separate modules (dagster-pandas and dagster-sqlalchemy). This makes the core dagster library have less dependencies.

Fix: Instead of importing dagster.sqlalchemy you need to pip install dagster-sqlalchemy, add it to your virtual env, and then include dagster_sqlalchemy instead.

  1. ConfigDefinition no longer exists.

Error:

ImportError: cannot import name 'ConfigDefinition'

We have eliminated a separate notion of a ConfigDefinition. Instead, we realized the the user provided config in a solid, resource, or context is just a Field that you would use to build a Dict or Selector. So replace ConfigDefinition with Field. (Generally config_def=ConfigDefinition is now config_field=Field)

Before:

"production": PipelineContextDefinition(
    context_fn=generate_production_execution_context,
    config_def=ConfigDefinition(
        # ...
    )

After:

"production": PipelineContextDefinition(
    context_fn=generate_production_execution_context,
    config_field=Field(
       # ...
    )
  1. New, Simpler Dagster Type Definition API.

Error:

    description='''This represents a path to a file on disk'''
E   TypeError: __init__() got multiple values for argument 'python_type'

Another Error:

E   dagster.check.ParameterCheckError: Param "klass" was supposed to be a type. Got <dagster.core.types.runtime.PythonObjectType object at 0x11e4fbf60> instead of type <class 'dagster.core.types.runtime.PythonObjectType'>

There are now two different type creation APIs. One for creating new types, and one for annotating existing types that you include.

Examples:

@dagster_type(description='This represents a path to a file on disk')
class PathToFile(str):
    pass

S3FileHandle = as_dagster_type(
    namedtuple('S3FileHandle', 'bucket path'),
    description='''
upload_header_to_s3 and upload_service_line_to_s3 both result in files
being uploaded to s3. Hence the "output" of those two solids is a handle
to a file. The following stages take those as their inputs to create
redshift tables out of them.

Properties:
    - bucket: String
    - path: String
        '''
)

Note you can use S3FileHandle and PathToFile as if they were just "normal types" as well.

  1. ConfigDictionary --> Dict

We have a much less verbose API for building configuration schema:

Error:

E   AttributeError: module 'dagster.core.types' has no attribute 'ConfigDictionary

First, we can discouraging the use of the types namespace. Instead just from dagster import Dict (or whatever class directly). Second, ConfigDictionary is now just Dict. Third, you do not have to name it. The net result is much nicer:

Before:

types.ConfigDictionary(
    'DefaultContextConfig',
    {
        'data_source_run_id' : Field(types.String, description='''
            This is a run id generated by the caller of this pipeline. Right
            now this is required to tie a single run id to multiple executions
            of the same pipeline.
        '''),
        'conf' : Field(types.Any),
    },
)

After:

Dict({
    'data_source_run_id' : Field(String, description='''
        This is a run id generated by the caller of this pipeline. Right
        now this is required to tie a single run id to multiple executions
        of the same pipeline.
    '''),
    'conf' : Field(Any),
})

This is a fairly mechanical transition.

  1. define_stub_solid no longer in top-level dagster

This is now an internal utility function. If you really, really need it:

from dagster.core.utility_solids import define_stub_solid

  1. Environments are raw dictionaries rather that config.* classes

Per update 1 config classes no longer are public or used in the execute_pipeline family of APIs. Use raw dictionaries instead. They should be shaped exactly like the yaml files.

Before:

    environment = config.Environment(
        context=config.Context(
            name='unittest',
            config={
                'data_source_run_id': str(uuid.uuid4()),
                'conf': CONF,
                'log_level': 'ERROR',
                'cleanup_files': False,
            }
        ),
        solids={
            'unzip_file': config.Solid({
                'zipped_file': ZIP_FILE_PATH
            }),
        },
    )

After:

    environment = {
        'context':{
            'unittest' : {
                'config' : {
                    'data_source_run_id': str(uuid.uuid4()),
                    'conf': CONF,
                    'log_level': 'ERROR',
                    'cleanup_files': False,
                }
            }
        },
        'solids': {
            'unzip_file': {
                'config' : {
                    'zipped_file': ZIP_FILE_PATH,
                }
            }
        }
    }

While providing less guarantees within the python type system, this API results in very high quality error checking and messaging from the dagster config schema.

  1. New testing APIs

Error:

 AttributeError: type object 'PipelineDefinition' has no attribute 'create_sub_pipeline'

or

AttributeError: type object 'PipelineDefinition' has no attribute 'create_single_solid_pipeline'

The creation of "sub" and "single_solid" pipelines was awkward and error-prone. Instead we have the new functions execute_solid and execute_solids. You can now execute a single solid with a single function call.

Before:

    pipeline = PipelineDefinition.create_single_solid_pipeline(
        define_fileload_pipeline(),
        'unzip_file',
    )

    result = execute_pipeline(pipeline, environment)

    assert result.success
    assert os.path.exists(
        result.result_for_solid('unzip_file').transformed_value())

After:

    solid_result = execute_solid(
        define_fileload_pipeline(),
        'unzip_file',
        environment=environment
    )

    assert solid_result.success
    assert os.path.exists(solid_result.transformed_value())

Before (with stubbed inputs):

    pipeline = PipelineDefinition.create_single_solid_pipeline(
        define_fileload_pipeline(),
        'split_headers_and_service_lines',
        {
            'split_headers_and_service_lines': {
                'unzipped_file':
                define_stub_solid('unzipped_path_value', unzipped_path)
            }
        },
    )

    result = execute_pipeline(pipeline, environment)

    assert result.success
    solid_result = result.result_for_solid('split_headers_and_service_lines')
    assert os.path.exists(solid_result.transformed_value('header_file'))
    assert os.path.exists(solid_result.transformed_value('service_lines_file'))

After (with stubbed inputs):

    solid_result = execute_solid(
        define_fileload_pipeline(),
        'split_headers_and_service_lines',
        inputs={
            'unzipped_file': unzipped_path,
        },
        environment=environment,
    )

    assert os.path.exists(solid_result.transformed_value('header_file'))
    assert os.path.exists(solid_result.transformed_value('service_lines_file'))

Before (subset execution):

    pipeline = PipelineDefinition.create_sub_pipeline(
        define_fileload_pipeline(),
        ['unzip_file'],
        ['split_headers_and_service_lines'],
        {},
    )

    result = execute_pipeline(pipeline, environment)


    assert result.success
    solid_result = result.result_for_solid('split_headers_and_service_lines')
    snapshot_check_results(snapshot, solid_result)

After (subset execution):

    result_dict = execute_solids(
        define_pipeline(),
        ['unzip_file', 'split_headers_and_service_lines'],
        environment=environment,
    )

    snapshot_check_results(snapshot, result_dict['split_headers_and_service_lines'])
  1. Execution Context Lifecycle Changes

Error:

AttributeError: 'ExecutionContext' object has no attribute 'value'

This is officially the most difficult change, conceptually. We changed the system so that the ExecutionContext passed around to your solids (now RuntimeExecutionContext) is constructed by the system rather than the user. The ExecutionContext object the user creates can be thought of as RuntimeExecutionContextParams. We opted against that name because it was excessively verbose.

Before:

    with context.value('data_source_run_id', data_source_run_id),\
        context.value('data_source', 'new_data'),\
        context.value('pipeline_run_id', pipeline_run_id):

        yield ExecutionContext(
            loggers=[define_colored_console_logger('dagster', log_level)],
            resources=resources
        )

After:

    yield ExecutionContext(
        loggers=[define_colored_console_logger('dagster', log_level)],
        resources=resources,
        context_stack={
            'data_source_run_id': data_source_run_id,
            'data_source': 'new_data',
            'pipeline_run_id': pipeline_run_id,
        },
    )
  1. Non-null by default

Error:

E   dagster.core.errors.DagsterTypeError: Solid solid_name input input_name received value None which does not pass the typecheck for Dagster type PandasDataFrame. Step solid_name.transform

You have encountered a type error. Likely it is because in 0.2.8, types could accept None by default, and this is no longer true in 0.3.0. You have to opt into accepting nulls.

Before:

@solid(outputs=[OutputDefinition(dagster_type=dagster_pd.DataFrame)])
def return_none(info):
    return None # None no longer allowed, would break at runtime

After

@solid(outputs=[OutputDefinition(dagster_type=Nullable(dagster_pd.DataFrame))])
def return_none(info):
    return None # Because of Nullable wrapper, this is ok
@schrockn
Copy link
Author

awesome thanks taylor!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment