Skip to content

Instantly share code, notes, and snippets.

@seanlindo
Created August 4, 2022 23:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save seanlindo/a096af16dc34df4f4e6f31be3c2c5bae to your computer and use it in GitHub Desktop.
Save seanlindo/a096af16dc34df4f4e6f31be3c2c5bae to your computer and use it in GitHub Desktop.
Dagster Example
from dagster import IOManager, io_manager
class MyIOManager(IOManager):
def __init__(self):
self.storage_dict = {}
def handle_output(self, context, obj):
self.storage_dict[(context.step_key, context.name)] = obj
def load_input(self, context):
return self.storage_dict[(context.upstream_output.step_key, context.upstream_output.name)]
@io_manager
def my_io_manager(_):
return MyIOManager()
from dagster import asset, with_resources, repository, define_asset_job, SourceAsset
from example_io_manager import my_io_manager
raw_users = SourceAsset(key="raw_users", io_manager_key="test_io_manager")
@asset(io_manager_key="test_io_manager")
def upstream_asset(raw_users):
#TODO do some transformations
transformedUsers = raw_users
return transformedUsers
#TODO what if there are many transformation steps? does each one have to be an asset? can they just be separate ops that work on the asset?
@repository
def repo():
return [
*with_resources(
[ raw_users, upstream_asset ],
resource_defs={
"test_io_manager": my_io_manager
},
),
define_asset_job("process_users"),
]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment