-
-
Save ravwojdyla/6a546e3fa65459b17413aac10c643f25 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@usable_as_dagster_type | |
class FSArtifactHandle(FileHandle): | |
def __init__(self, artifact: FSArtifact, catalog: Type[Catalog] = Catalog): | |
self._artifat = artifact | |
self._catalog = catalog | |
@property | |
def path_desc(self) -> str: | |
# TODO: this could be link etc | |
return self._artifat.main_dir | |
class FSArtifactManager(MemoizableIOManager): | |
def __init__(self, catalog: Type[Catalog] = Catalog): | |
self._catalog = catalog | |
def has_output(self, context: OutputContext): | |
assert not context is None, "Context is None" | |
return True | |
def load_input(self, context: InputContext): | |
context.log.info(f"Trying to load from {context}") | |
def handle_output(self, context: OutputContext, out: FSArtifactHandle): | |
if not isinstance(out, FSArtifactHandle): | |
raise ValueError(f"FSArtifactManager can only handle FSArtifactHandle") | |
context.log.info(f"Handling {out._artifat.id}") | |
@io_manager | |
def fs_artifact_io_manager(_) -> FSArtifactManager: | |
return FSArtifactManager() | |
def pipeline_logic() -> FSArtifact: | |
return FSArtifact.partitioned("_dev_rav_ala")) | |
@solid | |
def efo_download_solid(context) -> FSArtifactHandle: | |
return FSArtifactHandle(pipeline_logic()) | |
@pipeline( | |
mode_defs=[ | |
ModeDefinition("only_mode", | |
resource_defs={"io_manager": fs_artifact_io_manager}) | |
], | |
tags={MEMOIZED_RUN_TAG: "true"}, | |
) | |
def efo_download_pipeline(): | |
efo_download_solid() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment