Skip to content

Instantly share code, notes, and snippets.

@pingsutw
Created February 25, 2022 15:51
Show Gist options
  • Save pingsutw/187ca5e340308551a9926f9b17872290 to your computer and use it in GitHub Desktop.
Save pingsutw/187ca5e340308551a9926f9b17872290 to your computer and use it in GitHub Desktop.
import json
import os
from dataclasses import dataclass
import typing
from dataclasses_json import dataclass_json
from flytekit import task, workflow
from flytekit.types.directory import FlyteDirectory
from flytekit.types.file import FlyteFile
@dataclass_json
@dataclass
class Output:
manifest: FlyteFile
catalog: typing.Optional[FlyteFile]
run_results: typing.Optional[FlyteFile]
sources: typing.Optional[FlyteFile]
@task(cache_version="1", cache=True)
def create_outputs_task(input_dir: FlyteDirectory) -> Output:
manifest_path = os.path.join(input_dir, "manifest.json")
output = Output(
manifest=FlyteFile(manifest_path),
run_results=None,
catalog=None,
sources=None
)
data = {"hello": "world"}
json_string = json.dumps(data)
with open(manifest_path, 'w') as outfile:
json.dump(json_string, outfile)
return output
@task(cache_version="1", cache=True)
def consume_outputs_task(output: Output):
with open(output.manifest, "r") as f:
raw_json = json.load(f)
print(raw_json)
@workflow
def the_workflow(input_dir: FlyteDirectory):
return consume_outputs_task(output=create_outputs_task(input_dir=input_dir))
the_workflow(input_dir=FlyteDirectory("/tmp/test"))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment