Skip to content

Instantly share code, notes, and snippets.

@madkinsz
Last active Jul 1, 2021
Embed
What would you like to do?

Prefect 0.15.0: Improved flow run creation and inspection

We’re excited to announce the release of Prefect 0.15.0! This release is the result of months of work to improve the interface for creating and inspecting flow runs. 0.15.0 contains a new command line interface for creating flow runs as well a suite of objects for inspecting flow runs without writing GraphQL queries.

For the full list of enhancements and features, check out the release changelog.

. . .

The core of Prefect is orchestrating the execution of your workflows. When using Prefect Cloud or Server, you indicate a workflow should be run by creating a "flow run". A flow run can be created from the UI, via a GraphQL call, from a task in another flow run, or using the CLI. Continuous improvements have been made to launching flow runs from the the web UI; log levels can be easily adjusted, configuration can be modified per run, and JSON flow parameters have syntax highlighting. With the UI, we can redesign something easily because you're not relying on the interface programatically. With the Prefect Python library, we're careful to limit our redesigns until there's strong evidence we are correct since it requires changes to our programatic interface. In 0.15.0, we've added

  • prefect run -- a new CLI for running flows
  • create_flow_run and get_task_run_result tasks -- to create subflows and retrieve data from them
  • FlowRunView, TaskRunView, FlowView, and TenantView objects -- to pull data from the backend without writing GraphQL
  • simpler authentication using API keys

and more, but there's only so much we can fit in a blog post!

A new CLI for running flows

We created a new CLI for running flows: prefect run.

prefect run is designed to make the transition from local execution to running with a Cloud or Server backend seamless. Previously, when developing, you needed to define your flow then call flow.run() at the bottom; then, to test the flow, you'd run python my_flow_file.py; then, when the flow is ready to be pushed to Cloud, you'd change your flow.run() call to flow.register("project") and go to the UI to start a flow run. With prefect register (introduced in 0.14.13) and prefect run, you can drop this cruft from your flow. Limiting your flow files to the definition of your flow and using the CLI to perform actions on that definition means you don't have to edit your flow to change how it is run.

prefect run will take an import or file path to run a flow locally; this replaces using flow.run(). For example, the following will run the file, extract the flow object, and call flow.run() for you:

$ prefect run -p my_flow_file.py

To transition this flow from local to Cloud, we can register it with an interface that feels the same; this replaces flow.register():

$ prefect register -p my_flow_file.py --project example

Before we continue, let's switch to a real flow. We've added a "Hello World" flow to Prefect to make getting started really easy.

First, try running the flow locally:

$ prefect run -m prefect.hello_world
Retrieving local flow... Done
Running flow locally...
└── 11:21:52 | INFO    | Beginning Flow run for 'hello-world'
└── 11:21:52 | INFO    | Task 'name': Starting task run...
└── 11:21:52 | INFO    | Task 'name': Finished task run for task with final state: 'Success'
└── 11:21:52 | INFO    | Task 'capitalize': Starting task run...
└── 11:21:52 | INFO    | Task 'capitalize': Finished task run for task with final state: 'Success'
└── 11:21:52 | INFO    | Task 'say_hello': Starting task run...
└── 11:21:52 | INFO    | Hello World
└── 11:21:52 | INFO    | Task 'say_hello': Finished task run for task with final state: 'Success'
└── 11:21:52 | INFO    | Flow run SUCCESS: all reference tasks succeeded
Flow run succeeded!

Then, register the flow:

# You may need to create a project first with `prefect create project 'example'`
$ prefect register -m prefect.hello_world --project example
Collecting flows...
Processing 'prefect.hello_world':
  Building `Module` storage...
  Registering 'hello-world'... Done
  └── ID: 8a896c14-07ac-4538-bed1-162e188d780f
  └── Version: 1
======================== 1 registered ========================

Now, a flow run can be created using the prefect run CLI:

$ prefect run --name "hello-world" --watch
Looking up flow metadata... Done
Creating run for flow 'hello-world'... Done
└── Name: electric-mandrill
└── UUID: 55955649-f2e7-4d43-938a-f5b08974bab9
└── Labels: []
└── Parameters: {}
└── Context: {}
└── URL: https://cloud.prefect.io/prefect-engineering/flow-run/55955649-f2e7-4d43-938a-f5b08974bab9
Watching flow run execution...
└── 11:22:38 | INFO    | Entered state <Scheduled>: Flow run scheduled.
└── 11:22:45 | INFO    | Entered state <Submitted>: Submitted for execution
└── 11:22:45 | INFO    | Submitted for execution: Job prefect-job-9d1c807c
└── 11:22:47 | INFO    | Beginning Flow run for 'hello-world'
└── 11:22:47 | INFO    | Entered state <Running>: Running flow.
└── 11:22:47 | INFO    | Task 'name': Starting task run...
└── 11:22:47 | INFO    | Task 'name': Finished task run for task with final state: 'Success'
└── 11:22:48 | INFO    | Task 'capitalize': Starting task run...
└── 11:22:48 | INFO    | Task 'capitalize': Finished task run for task with final state: 'Success'
└── 11:22:48 | INFO    | Task 'say_hello': Starting task run...
└── 11:22:48 | INFO    | Hello World
└── 11:22:48 | INFO    | Task 'say_hello': Finished task run for task with final state: 'Success'
└── 11:22:48 | INFO    | Flow run SUCCESS: all reference tasks succeeded
└── 11:22:48 | INFO    | Entered state <Success>: All reference tasks succeeded.
Flow run succeeded!

What's with the --watch flag? When used with a backend, prefect run will typically exit immediately after creating a flow run since the execution of the flow run is managed by an agent. Above, we opted to watch the flow run which will stream logs and state changes from the flow run to your machine. This provides some instant feedback about how the flow run is going. Of course, we display a link to monitor the flow run from the UI and you can see all of the logs there as well.

What about the old command? prefect run flow will stick around for compatibility, but is deprecated now. Creating a new command lets us change behavior and arguments without worrying about backwards compatibility. This naming and deprecation matches the prefect register command, which replaced prefect register flow.

Agent warnings

A common question is: Why is my flow run stuck in a 'Scheduled' state; why hasn't it started?

When prefect run --watch is used, we can help answer this question. If the flow run does not enter a 'Running' state 15 seconds after being created, we will investigate. Flow runs are matched with agents by "labels". The labels on an agent must include all of the labels on the flow run for the agent to pick up the flow run. We pull all of your agents from the backend and compare your labels to the flow run's labels to determine if there are any matching agents. If not, we'll tell you to start an agent with the required set of labels. If there is an agent with matching labels, we'll check the last time the agent contacted the backend to see if it's healthy and warn you if it is not. Combined with the new agent screens in the UI, we're improving communication where things can go wrong so you can quickly identify a solution and focus on your engineering tasks at hand.

Agentless execution

When you're hacking away at a flow locally, it can feel a bit onerous to start an agent to execute your flow. In 0.15.0, we have introduced a new concept of "agentless flow run execution". Using the prefect run CLI, you can execute a flow directly with the rich set of features enabled by communicating with the backend API. For example, you'll be able to see your flow run from the UI, your Cloud concurrency limits will be respected, and you'll have a context populated with real object ids. At the same time, you'll be able to set breakpoints in your tasks and debug your flow locally.

Using our hello-world flow from above, pass the --execute flag to prefect run to execute the flow run without an agent.

$ prefect run --name "hello-world" --execute

This works by:

  • Creating a flow run for you in the backend with an extra unique label so an agent does not pick up the flow run
  • Creating a subprocess including environment variables from your RunConfig (other infrastructure settings are ignored)
  • The subprocess pulls the flow from storage
  • The flow is executed with reporting to the backend

Agents add a layer to Prefect in which we help you manage the infrastructure your flow is going to be deployed on. Since the agent is removed from the picture here, you are taking full ownership of the infrastructure your flow runs on. If you want the flow to run in a container, you must set up the container yourself then call this command. This gives deep control over the execution environment and scheduling of flow runs to the users that need it.

Sub-flow result passing

Since we introduced the ability to launch flows from a flow, users have been asking for a first-class way to pass task results between flows. In 0.15.0, we're releasing new tasks to create sub-flows and pull results from them with a very simple interface.

The existing task to create sub-flows, StartFlowRun, provides a wait flag which changes its return value from a flow run id to a state--this makes it hard to compose with other tasks. A much simpler create_flow_run task was introduced to create a child flow run. This task always returns a flow run id immediately after creation. We then added a get_task_run_result(flow_run_id: str, task_slug: str) task which will retrieve the result from the task in the given flow run.

For example, we can pull a result from a child flow into a parent flow then act on that data:

from typing import List

from prefect import Flow, Parameter, task
from prefect.engine.results import LocalResult
from prefect.tasks.prefect.flow_run import create_flow_run, get_task_run_result


@task(result=LocalResult())
def create_some_data(length: int):
    return list(range(length))


with Flow("child") as child_flow:
    data_size = Parameter("data_size", default=5)
    data = create_some_data(data_size)


@task(log_stdout=True)
def transform_and_show(data: List[int]) -> List[int]:
    print(f"Got: {data!r}")
    new_data = [x + 1 for x in data]
    print(f"Created: {new_data!r}")
    return new_data


with Flow("parent") as parent_flow:
    child_run_id = create_flow_run(
        flow_name=child_flow.name, parameters=dict(data_size=10)
    )
    child_data = get_task_run_result(child_run_id, "create_some_data-1")
    transform_and_show(child_data)

To run this example, save it to a file parent-child-flow.py. Then we can register and run the parent

$ prefect register -p parent-child-flow.py --project example
$ prefect run --name "parent" --execute
❯ prefect run --name 'parent' --execute
Looking up flow metadata... Done
Creating run for flow 'parent'... Done
└── Name: quixotic-orangutan
└── UUID: 8b17ffed-c70e-4a20-a3b4-efb2eebba9e9
└── Labels: ['agentless-run-2e92a4fd']
└── Parameters: {}
└── Context: {}
└── URL: https://cloud.prefect.io/prefect-engineering/flow-run/8b17ffed-c70e-4a20-a3b4-efb2eebba9e9
Executing flow run...
└── 13:19:07 | INFO    | Creating subprocess to execute flow run...
└── 13:19:08 | INFO    | Beginning Flow run for 'parent'
└── 13:19:09 | INFO    | Task 'create_flow_run': Starting task run...
└── 13:19:10 | INFO    | Creating flow run 'quixotic-orangutan-child' for flow 'child'...
└── 13:19:10 | INFO    | Created flow run 'quixotic-orangutan-child': https://cloud.prefect.io/prefect-engineering/flow-run/5eed1147-9247-48da-86e0-c7cee995cea9
└── 13:19:11 | INFO    | Task 'create_flow_run': Finished task run for task with final state: 'Success'
└── 13:19:11 | INFO    | Task 'get_task_run_result': Starting task run...
└── 13:19:22 | INFO    | Task 'get_task_run_result': Finished task run for task with final state: 'Success'
└── 13:19:23 | INFO    | Task 'transform_and_show': Starting task run...
└── 13:19:24 | INFO    | Got: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
└── 13:19:24 | INFO    | Created: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
└── 13:19:24 | INFO    | Task 'transform_and_show': Finished task run for task with final state: 'Success'
└── 13:19:25 | INFO    | Flow run SUCCESS: all reference tasks succeeded
Flow run succeeded!

Inspection of flow runs

The UI provides a wonderful view into your flow runs with minimal effort, we want to bring our Python library in-line with this ease. 0.15.0 creates a prefect.backend module with Python objects that provide views of the backend without requiring you to write any GraphQL queries. GraphQL is a powerful view into the Prefect backend, allowing you to retrieve exactly the data you want from nested objects, but often you just want to pull a typical set of data for a single object. Since flow runs are at the center of what Prefect does, we've focused on making it simple to pull useful data for a flow run.

The FlowRunView lets you pull the most recent information about a flow run from the backend. For example:

from prefect.backend import FlowRunView

# Create a new instance using an ID from the UI or from the output of the `prefect run` command
flow_run = FlowRunView.from_flow_run_id("4c0101af-c6bb-4b96-8661-63a5bbfb5596")

# You now have access to information about the flow run
flow_run.state       # <Success: "All reference tasks succeeded.">
flow_run.labels      # {"foo"}
flow_run.parameters  # {"name": "Marvin"}
flow_run.run_config  # LocalRun(...)
flow_run.states      # [Scheduled(), Submitted(), Running(), Success()]

# You can also retrieve information about the flow
flow = flow_run.get_flow_metadata()  # FlowView(...)

# Or about a task run
task_run = flow_run.get_task_run(task_slug='say_hello-1')  # TaskRunView(...)

Notice that the FlowRunView provides methods to access two related structures: FlowView and TaskRunView. FlowView exposes the metadata that we store about a flow when it is registered. TaskRunView exposes metadata for a single run of a task in your flow.

A FlowView can also be directly instantiated with various lookup methods:

from prefect.backend import FlowView

flow_view = FlowView.from_flow_name(flow_name="hello-world")

# If a name is not unique, a project will need to be provided
flow_view = FlowView.from_flow_name(flow_name="hello-world", project_name="example")

# Or a lookup can be performed by id
flow_view = FlowView.from_flow_id(flow_id="8a896c14-07ac-4538-bed1-162e188d780f")

# The view provides information about the flow
flow_view.run_config  # LocalRun(...)
flow_view.storage     # Module(...)
flow.core_version     # 0.15.0
flow.project_name     # "example"

A TaskRunView can also be instantiated directly, but it requires a flow run id so we recommend using FlowRunView().get_task_run(...):

from prefect import task
from prefect.backend import TaskRunView

# Presume we have a flow with the following task
@task
def foo():
  return "foobar!"

# Instantiate a task run directly
task_run = TaskRunView.from_task_slug("foo-1", flow_run_id="<id>")

# As with the other views, we can inspect the task run
task_run.state      # Success(...)
task_run.map_index  # -1  (not mapped)

# We can also retrieve results
task_run.get_result()  # "foobar!"

The most powerful feature of the TaskRunView is its ability to pull results from their location on your infrastructure. This object does the heavy lifting of looking up the location of the result and pulling the data from it. It also handles mapped tasks which require the result to be pulled from many locations. Check out the task run view documentation for more details.

These backend objects were designed on conjunction with all of the features described above. We've leveraged their simple interface to build new, powerful features with minimal effort. Without these abstractions, the code for our new features would have a lot of repetition and complexity. We're excited to see what you can build with them as well!

API keys for simple auth

The authentication paradigm in Prefect used to be pretty complicated. You had to generate tokens with the proper scope then use one token to register your flows and another to run your flows. Behind the scenes, the token you provided was generating short-lived tokens that were actually used for authentication, leading to a complex dance in the Prefect Client object. API keys solve all of these problems. An API key is tied directly to an account and has all of the permissions of that account. It can be used for both registering and running flows. It does not require additional token generation. We've added a whole new suite of CLI commands for managing authentication with API keys while retaining backwards compatibility with tokens.

Getting started with an API key

If you've never used a token, getting going with an API key is simple. Head to the UI API key page and create a new key. Then open up a terminal and run prefect auth login --key "<YOUR-KEY>"

Congrats! You're logged in. Your key has now been saved to ~/.prefect/auth.toml and will be used for all future operations. Try starting an agent with prefect agent local start.

If your user belongs to multiple tenants, you can view them with prefect auth list-tenants and switch tenants with prefect auth switch-tenants. When you switch tenants, the auth.toml file will be updated so all future Cloud interactions use the new tenant.

If you want to remove your key from your machine, run prefect auth logout.

Transitioning from a token to an API key

If you're using a token right now, we encourage you to switch over! Run prefect auth logout to remove your 'short-lived' token then run prefect auth logout again to remove your API token from your machine entirely. We care a lot about backwards compatibility and didn't want to break the old behavior of the logout command for tokens, which is why it takes two goes to remove it entirely.

You may get some warnings that your API token has been set elsewhere when you try using the CLI to logout. This means you've set your auth token in the config.toml at prefect.cloud.auth_token or as an environment variable PREFECT__CLOUD__AUTH_TOKEN. Just remove that config setting and you're logged out.

Since API keys can be used to run agents, you can also remove prefect.cloud.agent.auth_token or PREFECT__CLOUD__AGENT__AUTH_TOKEN as well. You won't be needing them anymore.

Now that you've purged your token, you can login with an API key as described above.

Service accounts

When running agents in production or registering flows in CI, it doesn't make sense to tie API interactions to your specific user account. For this, we've introduced service accounts which are scoped to a single tenant. You can create service accounts and an associated API key from the UI.

Then, in your CI job you can set an environment variable PREFECT__CLOUD__API_KEY="<API-KEY>" or use the login CLI as described above.

In conclusion

We've rehauled the Prefect Core Python API for running and inspecting flow runs and exposed some powerful new patterns. In the process, we rewrote most of the flow run documentation. We've also simplified authentication while providing more powerful access control. We can't fit everything in a blog post, so be sure to check out the release changelog to see everything else.

. . .

We're excited to see what you can do with these new features and we're always looking for more feedback so we can continue to make the best orchestration tool around!

Please continue reaching out to us — we appreciate the opportunity to work with all of you!

Happy Engineering!

— The Prefect Team

@mashun4ek

This comment has been minimized.

Copy link

@mashun4ek mashun4ek commented Jun 30, 2021

Awesome work! 👏

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment