-
-
Save wild-endeavor/0ec895941a879b798926fc81a03d28cd to your computer and use it in GitHub Desktop.
# The three options that we were talking about. | |
# Partitions values will continue to be strings, so they'll be coerced into strings | |
# | |
# Option A - Simplest | |
# This is the closest to the original proposal. | |
# 1) Declare an instance of Artifact | |
# | 2) give it a name | |
# | project&domain added at registration time | |
# ↓ ↓ | |
my_artifact = Artifact(name="my_data", partitions=["a", "b"], with_time_partition=True) | |
# ↑ 4) ↳ existence of time partition declared separately (optional) | |
# 3) specify partition keys only (optional) | |
# | |
# 5) Calling the object in the signature will | |
@task # ↓ bind partition values | |
def base_t2_orig(b_value: str) -> Annotated[pd.DataFrame, my_artifact(b=Inputs.b_value)]: | |
df = ... # ↳ Object that will raise error if the .value | |
return df # is not an input | |
# We believe this option is the simplest because it | |
# - requires the minimum amount of code change to existing code. Annotations on functions won't change anything | |
# downstream at all, the signature doesn't change, caching doesn't change. | |
# - Partition values are known at run-time but before the task is actually run. Just using the inputs. | |
# This is a fact that we can potentially leverage in the future | |
# Cons: | |
# - Partition values are strings, specifying values like this means Flyte will pick how to transform your inputs | |
# into strings. This may be a problem for dates. | |
# - Ketan had some additional concern around input bindings being spread out... will confirm. | |
# Option B - More explicit | |
# Instantiate and then wrap the result when returning | |
# 1) Before instantiating the Artifact, declare the Python type that the Artifact will be used to track | |
# | 2) Give it a name and partition keys as per Option A | |
# ↓ ↓ | |
my_artifact = Artifact[pd.DataFrame](name="flyteorg.test.yt.teststr", partitions=["a", "b"]) | |
@task # 3) Continue to annotate the return type with the artifact instance | |
def base_t2_orig(b_value: str) -> Annotated[pd.DataFrame, my_artifact]: | |
df = ... | |
# 4) When returning, wrap the object - the type should match the [] above | |
# | 5) and provide partition values as kwargs | |
# ↓ ↓ | |
return my_artifact(df, a="a_value", b=b_value+"2") | |
# This is a more powerful variant of the first option because it allows users to specify partition values at runtime | |
# and this form factor can also be used to add generic and queryable metadata to the artifact in the near future. | |
# Also, downstream tasks continue to not need to change. | |
# Cons: | |
# - The return statement may look weird. You return the result of a function call, not the object itself. | |
# - Not sure if this Option works for returning from workflows (since there's no compute, it's compile time code) | |
# (ie, what will run the +"2") | |
# Option C - Most explicit | |
# I think maybe save this implementation for last as it's the most complicated and will require the most changes. | |
# This example also shows a workflow because it's a bit more complicated. | |
# | |
# 1) Start by subclassing and specifying the generic type | |
class MyData(Artifact[pd.DataFrame]): | |
partitions = ["a", "b"] | |
# 2) There is no Annotated here, the subclass is returned directly | |
@workflow # ↓ | |
def upstream_wf(input_a: datetime) -> typing.Tuple[int, MyData]: # but multiple outputs should continue to work | |
int_result = some_task() | |
df_result = some_task_2() | |
str_result = some_task_3() | |
# 3) When returning, instantiate the subclass and provide partition values as kwargs | |
# ↓ | |
return int_result, MyData(df_result, a="constant str okay", b=Inputs.input_a).set_additional_metadata(...) # ??? | |
# ↑ | |
# 4) Note that here if we had done b=str_result it won't work, not without | |
# a lot of additional work, because it's a workflow. But using a string value | |
# in the return from a task would've worked. If we get this working for workflows | |
# we can also support set_additional_metadata. It behaves basically like additional | |
# bindings on the workflow end-node. | |
@workflow | |
def downstream_wf(input_d: pd.DataFrame): | |
... | |
@workflow | |
def downstream_wf_2(input_d: MyData): | |
... | |
@workflow | |
def parent_wf(): | |
int_result, df_result = upstream_wf(input_a=datetime.now()) | |
# 5) Note the call patterns here, for downstream tasks/workflows taking the original type, you need to | |
# ↓ dereference the artifact_value | |
downstream_wf(input_d=df_result.artifact_value) | |
# 6) But you don't need to if the input in the subclass itself. | |
downstream_wf_2(input_d=df_result) |
NB: Why does it needs to be Inputs.input_a rather than just input_a? Is this related to not wanting artifacts to be in the critical path? if we keep metadata separate does that fix this problem (as above?)
Is there a way to not derive the artifact from the input at all? E.g. inputs are hyper parameters and data, output is a trained model?
NB: Why does it needs to be Inputs.input_a rather than just input_a? Is this related to not wanting artifacts to be in the critical path? if we keep metadata separate does that fix this problem (as above?)
Is there a way to not derive the artifact from the input at all? E.g. inputs are hyper parameters and data, output is a trained model?
This was actually functionality I requested. It feels like the metadata system will be of pretty limited utility unless users can tack additional stuff on it.
Yes I agree with this concern, I think it's good when libraries offer one single way of doing things.
I understand that option 3 is a lot more complex to implement. Do you think it would be realistic to come up with a single syntax and then implement only a subset of the functionality to make the initial complexity similar to that of option 1?
Is it correct that option 3 (or similar) would be a lot easier to implement if initially one needs to instantiate the artifact in a task and not in a workflow?
Or are there any other limitations one could introduce initially to make the complexity of option 3 similar to option 1 but already nail down a single syntax?