Skip to content

Instantly share code, notes, and snippets.

@wild-endeavor
Last active October 12, 2023 13:01
Show Gist options
  • Save wild-endeavor/0ec895941a879b798926fc81a03d28cd to your computer and use it in GitHub Desktop.
Save wild-endeavor/0ec895941a879b798926fc81a03d28cd to your computer and use it in GitHub Desktop.
Artifact ux conclusion
# The three options that we were talking about.
# Partitions values will continue to be strings, so they'll be coerced into strings
#
# Option A - Simplest
# This is the closest to the original proposal.
# 1) Declare an instance of Artifact
# | 2) give it a name
# | project&domain added at registration time
# ↓ ↓
my_artifact = Artifact(name="my_data", partitions=["a", "b"], with_time_partition=True)
# ↑ 4) ↳ existence of time partition declared separately (optional)
# 3) specify partition keys only (optional)
#
# 5) Calling the object in the signature will
@task # ↓ bind partition values
def base_t2_orig(b_value: str) -> Annotated[pd.DataFrame, my_artifact(b=Inputs.b_value)]:
df = ... # ↳ Object that will raise error if the .value
return df # is not an input
# We believe this option is the simplest because it
# - requires the minimum amount of code change to existing code. Annotations on functions won't change anything
# downstream at all, the signature doesn't change, caching doesn't change.
# - Partition values are known at run-time but before the task is actually run. Just using the inputs.
# This is a fact that we can potentially leverage in the future
# Cons:
# - Partition values are strings, specifying values like this means Flyte will pick how to transform your inputs
# into strings. This may be a problem for dates.
# - Ketan had some additional concern around input bindings being spread out... will confirm.
# Option B - More explicit
# Instantiate and then wrap the result when returning
# 1) Before instantiating the Artifact, declare the Python type that the Artifact will be used to track
# | 2) Give it a name and partition keys as per Option A
# ↓ ↓
my_artifact = Artifact[pd.DataFrame](name="flyteorg.test.yt.teststr", partitions=["a", "b"])
@task # 3) Continue to annotate the return type with the artifact instance
def base_t2_orig(b_value: str) -> Annotated[pd.DataFrame, my_artifact]:
df = ...
# 4) When returning, wrap the object - the type should match the [] above
# | 5) and provide partition values as kwargs
# ↓ ↓
return my_artifact(df, a="a_value", b=b_value+"2")
# This is a more powerful variant of the first option because it allows users to specify partition values at runtime
# and this form factor can also be used to add generic and queryable metadata to the artifact in the near future.
# Also, downstream tasks continue to not need to change.
# Cons:
# - The return statement may look weird. You return the result of a function call, not the object itself.
# - Not sure if this Option works for returning from workflows (since there's no compute, it's compile time code)
# (ie, what will run the +"2")
# Option C - Most explicit
# I think maybe save this implementation for last as it's the most complicated and will require the most changes.
# This example also shows a workflow because it's a bit more complicated.
#
# 1) Start by subclassing and specifying the generic type
class MyData(Artifact[pd.DataFrame]):
partitions = ["a", "b"]
# 2) There is no Annotated here, the subclass is returned directly
@workflow # ↓
def upstream_wf(input_a: datetime) -> typing.Tuple[int, MyData]: # but multiple outputs should continue to work
int_result = some_task()
df_result = some_task_2()
str_result = some_task_3()
# 3) When returning, instantiate the subclass and provide partition values as kwargs
# ↓
return int_result, MyData(df_result, a="constant str okay", b=Inputs.input_a).set_additional_metadata(...) # ???
# ↑
# 4) Note that here if we had done b=str_result it won't work, not without
# a lot of additional work, because it's a workflow. But using a string value
# in the return from a task would've worked. If we get this working for workflows
# we can also support set_additional_metadata. It behaves basically like additional
# bindings on the workflow end-node.
@workflow
def downstream_wf(input_d: pd.DataFrame):
...
@workflow
def downstream_wf_2(input_d: MyData):
...
@workflow
def parent_wf():
int_result, df_result = upstream_wf(input_a=datetime.now())
# 5) Note the call patterns here, for downstream tasks/workflows taking the original type, you need to
# ↓ dereference the artifact_value
downstream_wf(input_d=df_result.artifact_value)
# 6) But you don't need to if the input in the subclass itself.
downstream_wf_2(input_d=df_result)
@elibixby
Copy link

NB: Why does it needs to be Inputs.input_a rather than just input_a? Is this related to not wanting artifacts to be in the critical path? if we keep metadata separate does that fix this problem (as above?)

Is there a way to not derive the artifact from the input at all? E.g. inputs are hyper parameters and data, output is a trained model?

This was actually functionality I requested. It feels like the metadata system will be of pretty limited utility unless users can tack additional stuff on it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment