This guide is a step-by-step guide for upgrading from dagster 0.2.x to 0.3.0. This represents a substantial upgrade in capabilities but also some breaking API changes. We'll detail them, provide context and reasoning, and instructions about how to upgrade.
- No more top level config subpackage.
Error:
from dagster import (
ImportError: cannot import name 'config'
We have eliminated the public-facing "config" namespace. (You use raw dictionaries instead of a parallel, typed API to configure pipeline runs).
Fix: Simply eliminate the include. You'll run into related errors later.
- No more dagster.sqlalchemy and dagster.pandas submodules.
Error:
E ModuleNotFoundError: No module named 'dagster.sqlalchemy'
We have moved pandas and sqlalchemy code into their own separate modules (dagster-pandas and dagster-sqlalchemy). This makes the core dagster library have less dependencies.
Fix: Instead of importing dagster.sqlalchemy
you need to pip install dagster-sqlalchemy
, add it to your virtual env, and then include dagster_sqlalchemy
instead.
- ConfigDefinition no longer exists.
Error:
ImportError: cannot import name 'ConfigDefinition'
We have eliminated a separate notion of a ConfigDefinition. Instead, we realized the the user provided config in a solid, resource, or context is just a Field
that you would use to build a Dict
or Selector
. So replace ConfigDefinition
with Field. (Generally config_def=ConfigDefinition
is now config_field=Field
)
Before:
"production": PipelineContextDefinition(
context_fn=generate_production_execution_context,
config_def=ConfigDefinition(
# ...
)
After:
"production": PipelineContextDefinition(
context_fn=generate_production_execution_context,
config_field=Field(
# ...
)
- New, Simpler Dagster Type Definition API.
Error:
description='''This represents a path to a file on disk'''
E TypeError: __init__() got multiple values for argument 'python_type'
Another Error:
E dagster.check.ParameterCheckError: Param "klass" was supposed to be a type. Got <dagster.core.types.runtime.PythonObjectType object at 0x11e4fbf60> instead of type <class 'dagster.core.types.runtime.PythonObjectType'>
There are now two different type creation APIs. One for creating new types, and one for annotating existing types that you include.
Examples:
@dagster_type(description='This represents a path to a file on disk')
class PathToFile(str):
pass
S3FileHandle = as_dagster_type(
namedtuple('S3FileHandle', 'bucket path'),
description='''
upload_header_to_s3 and upload_service_line_to_s3 both result in files
being uploaded to s3. Hence the "output" of those two solids is a handle
to a file. The following stages take those as their inputs to create
redshift tables out of them.
Properties:
- bucket: String
- path: String
'''
)
Note you can use S3FileHandle and PathToFile as if they were just "normal types" as well.
- ConfigDictionary --> Dict
We have a much less verbose API for building configuration schema:
Error:
E AttributeError: module 'dagster.core.types' has no attribute 'ConfigDictionary
First, we can discouraging the use of the types
namespace. Instead just from dagster import Dict
(or whatever class directly).
Second, ConfigDictionary
is now just Dict
.
Third, you do not have to name it. The net result is much nicer:
Before:
types.ConfigDictionary(
'DefaultContextConfig',
{
'data_source_run_id' : Field(types.String, description='''
This is a run id generated by the caller of this pipeline. Right
now this is required to tie a single run id to multiple executions
of the same pipeline.
'''),
'conf' : Field(types.Any),
},
)
After:
Dict({
'data_source_run_id' : Field(String, description='''
This is a run id generated by the caller of this pipeline. Right
now this is required to tie a single run id to multiple executions
of the same pipeline.
'''),
'conf' : Field(Any),
})
This is a fairly mechanical transition.
- define_stub_solid no longer in top-level dagster
This is now an internal utility function. If you really, really need it:
from dagster.core.utility_solids import define_stub_solid
- Environments are raw dictionaries rather that config.* classes
Per update 1 config classes no longer are public or used in the execute_pipeline family of APIs. Use raw dictionaries instead. They should be shaped exactly like the yaml files.
Before:
environment = config.Environment(
context=config.Context(
name='unittest',
config={
'data_source_run_id': str(uuid.uuid4()),
'conf': CONF,
'log_level': 'ERROR',
'cleanup_files': False,
}
),
solids={
'unzip_file': config.Solid({
'zipped_file': ZIP_FILE_PATH
}),
},
)
After:
environment = {
'context':{
'unittest' : {
'config' : {
'data_source_run_id': str(uuid.uuid4()),
'conf': CONF,
'log_level': 'ERROR',
'cleanup_files': False,
}
}
},
'solids': {
'unzip_file': {
'config' : {
'zipped_file': ZIP_FILE_PATH,
}
}
}
}
While providing less guarantees within the python type system, this API results in very high quality error checking and messaging from the dagster config schema.
- New testing APIs
Error:
AttributeError: type object 'PipelineDefinition' has no attribute 'create_sub_pipeline'
or
AttributeError: type object 'PipelineDefinition' has no attribute 'create_single_solid_pipeline'
The creation of "sub" and "single_solid" pipelines was awkward and error-prone. Instead we have the new functions execute_solid
and execute_solids
. You can now execute a single solid with a single function call.
Before:
pipeline = PipelineDefinition.create_single_solid_pipeline(
define_fileload_pipeline(),
'unzip_file',
)
result = execute_pipeline(pipeline, environment)
assert result.success
assert os.path.exists(
result.result_for_solid('unzip_file').transformed_value())
After:
solid_result = execute_solid(
define_fileload_pipeline(),
'unzip_file',
environment=environment
)
assert solid_result.success
assert os.path.exists(solid_result.transformed_value())
Before (with stubbed inputs):
pipeline = PipelineDefinition.create_single_solid_pipeline(
define_fileload_pipeline(),
'split_headers_and_service_lines',
{
'split_headers_and_service_lines': {
'unzipped_file':
define_stub_solid('unzipped_path_value', unzipped_path)
}
},
)
result = execute_pipeline(pipeline, environment)
assert result.success
solid_result = result.result_for_solid('split_headers_and_service_lines')
assert os.path.exists(solid_result.transformed_value('header_file'))
assert os.path.exists(solid_result.transformed_value('service_lines_file'))
After (with stubbed inputs):
solid_result = execute_solid(
define_fileload_pipeline(),
'split_headers_and_service_lines',
inputs={
'unzipped_file': unzipped_path,
},
environment=environment,
)
assert os.path.exists(solid_result.transformed_value('header_file'))
assert os.path.exists(solid_result.transformed_value('service_lines_file'))
Before (subset execution):
pipeline = PipelineDefinition.create_sub_pipeline(
define_fileload_pipeline(),
['unzip_file'],
['split_headers_and_service_lines'],
{},
)
result = execute_pipeline(pipeline, environment)
assert result.success
solid_result = result.result_for_solid('split_headers_and_service_lines')
snapshot_check_results(snapshot, solid_result)
After (subset execution):
result_dict = execute_solids(
define_pipeline(),
['unzip_file', 'split_headers_and_service_lines'],
environment=environment,
)
snapshot_check_results(snapshot, result_dict['split_headers_and_service_lines'])
- Execution Context Lifecycle Changes
Error:
AttributeError: 'ExecutionContext' object has no attribute 'value'
This is officially the most difficult change, conceptually. We changed the system so that the ExecutionContext
passed around to your solids (now RuntimeExecutionContext
) is constructed by the system rather than the user. The ExecutionContext
object the user creates can be thought of as RuntimeExecutionContextParams
. We opted against that name because it was excessively verbose.
Before:
with context.value('data_source_run_id', data_source_run_id),\
context.value('data_source', 'new_data'),\
context.value('pipeline_run_id', pipeline_run_id):
yield ExecutionContext(
loggers=[define_colored_console_logger('dagster', log_level)],
resources=resources
)
After:
yield ExecutionContext(
loggers=[define_colored_console_logger('dagster', log_level)],
resources=resources,
context_stack={
'data_source_run_id': data_source_run_id,
'data_source': 'new_data',
'pipeline_run_id': pipeline_run_id,
},
)
- Non-null by default
Error:
E dagster.core.errors.DagsterTypeError: Solid solid_name input input_name received value None which does not pass the typecheck for Dagster type PandasDataFrame. Step solid_name.transform
You have encountered a type error. Likely it is because in 0.2.8, types could accept None by default, and this is no longer true in 0.3.0. You have to opt into accepting nulls.
Before:
@solid(outputs=[OutputDefinition(dagster_type=dagster_pd.DataFrame)])
def return_none(info):
return None # None no longer allowed, would break at runtime
After
@solid(outputs=[OutputDefinition(dagster_type=Nullable(dagster_pd.DataFrame))])
def return_none(info):
return None # Because of Nullable wrapper, this is ok
This is astonishingly helpful to see the before and after snippets. I can’t emphasize that enough. I also very much appreciate the enumerated list of changes.