Last active
September 23, 2022 20:12
-
-
Save goyalankit/a3ed0b73ba04110d7b07b45a8434e28f to your computer and use it in GitHub Desktop.
flyte poc
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import typing | |
from flytekit import task, workflow, get_reference_entity, kwtypes | |
from flytekit.models.core.identifier import ResourceType | |
from typing import Dict# noqa: E402 | |
@task | |
def say_hello() -> str: | |
return "hello world" | |
ref_entity = get_reference_entity( | |
ResourceType.LAUNCH_PLAN, | |
"flyteexamples", | |
"development", | |
"pcv2_poc.user_training_pipeline.fds_importer", | |
"fqL3NrXDGEcU3OySzrJunQ==", | |
inputs=kwtypes(my_input=str), | |
outputs=kwtypes(result=Dict), | |
) | |
@workflow | |
def my_wf() -> Dict: | |
res = say_hello() | |
output = ref_entity(my_input=res) | |
return output | |
if __name__ == "__main__": | |
print(f"Running my_wf() { my_wf() }") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import typing # noqa: E402 | |
from collections import Counter # noqa: E402 | |
from typing import Dict, Tuple # noqa: E402 | |
from flytekit import LaunchPlan, task, workflow # noqa: E402 | |
# from .core import tensorflow_training_step, fds_importer_step | |
@task(cache=True, cache_version='1.0') | |
def count_freq_words(input_string1: str) -> Dict: | |
# input_string = "The cat sat on the mat" | |
words = input_string1.split() | |
wordCount = dict(Counter(words)) | |
return wordCount | |
@task | |
def do_something_else(input_dict1: Dict) -> Dict: | |
for item in input_dict1: | |
print(item) | |
return input_dict1 | |
@workflow | |
def fds_importer(my_input: str) -> typing.NamedTuple("ResultFDS", result=Dict): | |
dict1 = count_freq_words(input_string1=my_input) | |
result = do_something_else(input_dict1=dict1) | |
return result | |
fds_importer_step = LaunchPlan.get_or_create( | |
fds_importer | |
) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment