Skip to content

Instantly share code, notes, and snippets.

@ravwojdyla
Last active February 16, 2021 11:22
Show Gist options
  • Save ravwojdyla/6a546e3fa65459b17413aac10c643f25 to your computer and use it in GitHub Desktop.
Save ravwojdyla/6a546e3fa65459b17413aac10c643f25 to your computer and use it in GitHub Desktop.
@usable_as_dagster_type
class FSArtifactHandle(FileHandle):
def __init__(self, artifact: FSArtifact, catalog: Type[Catalog] = Catalog):
self._artifat = artifact
self._catalog = catalog
@property
def path_desc(self) -> str:
# TODO: this could be link etc
return self._artifat.main_dir
class FSArtifactManager(MemoizableIOManager):
def __init__(self, catalog: Type[Catalog] = Catalog):
self._catalog = catalog
def has_output(self, context: OutputContext):
assert not context is None, "Context is None"
return True
def load_input(self, context: InputContext):
context.log.info(f"Trying to load from {context}")
def handle_output(self, context: OutputContext, out: FSArtifactHandle):
if not isinstance(out, FSArtifactHandle):
raise ValueError(f"FSArtifactManager can only handle FSArtifactHandle")
context.log.info(f"Handling {out._artifat.id}")
@io_manager
def fs_artifact_io_manager(_) -> FSArtifactManager:
return FSArtifactManager()
def pipeline_logic() -> FSArtifact:
return FSArtifact.partitioned("_dev_rav_ala"))
@solid
def efo_download_solid(context) -> FSArtifactHandle:
return FSArtifactHandle(pipeline_logic())
@pipeline(
mode_defs=[
ModeDefinition("only_mode",
resource_defs={"io_manager": fs_artifact_io_manager})
],
tags={MEMOIZED_RUN_TAG: "true"},
)
def efo_download_pipeline():
efo_download_solid()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment