Prefect 0.15.0: Improved flow run creation and inspection
We’re excited to announce the release of Prefect 0.15.0! This release is the result of months of work to improve the interface for creating and inspecting flow runs. 0.15.0 contains a new command line interface for creating flow runs as well a suite of objects for inspecting flow runs without writing GraphQL queries.
For the full list of enhancements and features, check out the release changelog.
. . .
The core of Prefect is orchestrating the execution of your workflows. When using Prefect Cloud or Server, you indicate a workflow should be run by creating a "flow run". A flow run can be created from the UI, via a GraphQL call, from a task in another flow run, or using the CLI. Continuous improvements have been made to launching flow runs from the the web UI; log levels can be easily adjusted, configuration can be modified per run, and JSON flow parameters have syntax highlighting. With the UI, we can redesign something easily because you're not relying on the interface programatically. With the Prefect Python library, we're careful to limit our redesigns until there's strong evidence we are correct since it requires changes to our programatic interface. In 0.15.0, we've added
prefect run
-- a new CLI for running flowscreate_flow_run
andget_task_run_result
tasks -- to create subflows and retrieve data from themFlowRunView
,TaskRunView
,FlowView
, andTenantView
objects -- to pull data from the backend without writing GraphQL- simpler authentication using API keys
and more, but there's only so much we can fit in a blog post!
A new CLI for running flows
We created a new CLI for running flows: prefect run
.
prefect run
is designed to make the transition from local execution to running with a Cloud or Server backend seamless. Previously, when developing, you needed to define your flow then call flow.run()
at the bottom; then, to test the flow, you'd run python my_flow_file.py
; then, when the flow is ready to be pushed to Cloud, you'd change your flow.run()
call to flow.register("project")
and go to the UI to start a flow run. With prefect register
(introduced in 0.14.13) and prefect run
, you can drop this cruft from your flow. Limiting your flow files to the definition of your flow and using the CLI to perform actions on that definition means you don't have to edit your flow to change how it is run.
prefect run
will take an import or file path to run a flow locally; this replaces using flow.run()
. For example, the following will run the file, extract the flow object, and call flow.run()
for you:
$ prefect run -p my_flow_file.py
To transition this flow from local to Cloud, we can register it with an interface that feels the same; this replaces flow.register()
:
$ prefect register -p my_flow_file.py --project example
Before we continue, let's switch to a real flow. We've added a "Hello World" flow to Prefect to make getting started really easy.
First, try running the flow locally:
$ prefect run -m prefect.hello_world
Retrieving local flow... Done
Running flow locally...
└── 11:21:52 | INFO | Beginning Flow run for 'hello-world'
└── 11:21:52 | INFO | Task 'name': Starting task run...
└── 11:21:52 | INFO | Task 'name': Finished task run for task with final state: 'Success'
└── 11:21:52 | INFO | Task 'capitalize': Starting task run...
└── 11:21:52 | INFO | Task 'capitalize': Finished task run for task with final state: 'Success'
└── 11:21:52 | INFO | Task 'say_hello': Starting task run...
└── 11:21:52 | INFO | Hello World
└── 11:21:52 | INFO | Task 'say_hello': Finished task run for task with final state: 'Success'
└── 11:21:52 | INFO | Flow run SUCCESS: all reference tasks succeeded
Flow run succeeded!
Then, register the flow:
# You may need to create a project first with `prefect create project 'example'`
$ prefect register -m prefect.hello_world --project example
Collecting flows...
Processing 'prefect.hello_world':
Building `Module` storage...
Registering 'hello-world'... Done
└── ID: 8a896c14-07ac-4538-bed1-162e188d780f
└── Version: 1
======================== 1 registered ========================
Now, a flow run can be created using the prefect run
CLI:
$ prefect run --name "hello-world" --watch
Looking up flow metadata... Done
Creating run for flow 'hello-world'... Done
└── Name: electric-mandrill
└── UUID: 55955649-f2e7-4d43-938a-f5b08974bab9
└── Labels: []
└── Parameters: {}
└── Context: {}
└── URL: https://cloud.prefect.io/prefect-engineering/flow-run/55955649-f2e7-4d43-938a-f5b08974bab9
Watching flow run execution...
└── 11:22:38 | INFO | Entered state <Scheduled>: Flow run scheduled.
└── 11:22:45 | INFO | Entered state <Submitted>: Submitted for execution
└── 11:22:45 | INFO | Submitted for execution: Job prefect-job-9d1c807c
└── 11:22:47 | INFO | Beginning Flow run for 'hello-world'
└── 11:22:47 | INFO | Entered state <Running>: Running flow.
└── 11:22:47 | INFO | Task 'name': Starting task run...
└── 11:22:47 | INFO | Task 'name': Finished task run for task with final state: 'Success'
└── 11:22:48 | INFO | Task 'capitalize': Starting task run...
└── 11:22:48 | INFO | Task 'capitalize': Finished task run for task with final state: 'Success'
└── 11:22:48 | INFO | Task 'say_hello': Starting task run...
└── 11:22:48 | INFO | Hello World
└── 11:22:48 | INFO | Task 'say_hello': Finished task run for task with final state: 'Success'
└── 11:22:48 | INFO | Flow run SUCCESS: all reference tasks succeeded
└── 11:22:48 | INFO | Entered state <Success>: All reference tasks succeeded.
Flow run succeeded!
What's with the --watch
flag? When used with a backend, prefect run
will typically exit immediately after creating a flow run since the execution of the flow run is managed by an agent. Above, we opted to watch the flow run which will stream logs and state changes from the flow run to your machine. This provides some instant feedback about how the flow run is going. Of course, we display a link to monitor the flow run from the UI and you can see all of the logs there as well.
What about the old command? prefect run flow
will stick around for compatibility, but is deprecated now. Creating a new command lets us change behavior and arguments without worrying about backwards compatibility. This naming and deprecation matches the prefect register
command, which replaced prefect register flow
.
Agent warnings
A common question is: Why is my flow run stuck in a 'Scheduled' state; why hasn't it started?
When prefect run --watch
is used, we can help answer this question. If the flow run does not enter a 'Running' state 15 seconds after being created, we will investigate. Flow runs are matched with agents by "labels". The labels on an agent must include all of the labels on the flow run for the agent to pick up the flow run. We pull all of your agents from the backend and compare your labels to the flow run's labels to determine if there are any matching agents. If not, we'll tell you to start an agent with the required set of labels. If there is an agent with matching labels, we'll check the last time the agent contacted the backend to see if it's healthy and warn you if it is not. Combined with the new agent screens in the UI, we're improving communication where things can go wrong so you can quickly identify a solution and focus on your engineering tasks at hand.
Agentless execution
When you're hacking away at a flow locally, it can feel a bit onerous to start an agent to execute your flow. In 0.15.0, we have introduced a new concept of "agentless flow run execution". Using the prefect run
CLI, you can execute a flow directly with the rich set of features enabled by communicating with the backend API. For example, you'll be able to see your flow run from the UI, your Cloud concurrency limits will be respected, and you'll have a context populated with real object ids. At the same time, you'll be able to set breakpoints in your tasks and debug your flow locally.
Using our hello-world
flow from above, pass the --execute
flag to prefect run
to execute the flow run without an agent.
$ prefect run --name "hello-world" --execute
This works by:
- Creating a flow run for you in the backend with an extra unique label so an agent does not pick up the flow run
- Creating a subprocess including environment variables from your
RunConfig
(other infrastructure settings are ignored) - The subprocess pulls the flow from storage
- The flow is executed with reporting to the backend
Agents add a layer to Prefect in which we help you manage the infrastructure your flow is going to be deployed on. Since the agent is removed from the picture here, you are taking full ownership of the infrastructure your flow runs on. If you want the flow to run in a container, you must set up the container yourself then call this command. This gives deep control over the execution environment and scheduling of flow runs to the users that need it.
Sub-flow result passing
Since we introduced the ability to launch flows from a flow, users have been asking for a first-class way to pass task results between flows. In 0.15.0, we're releasing new tasks to create sub-flows and pull results from them with a very simple interface.
The existing task to create sub-flows, StartFlowRun
, provides a wait
flag which changes its return value from a flow run id to a state--this makes it hard to compose with other tasks. A much simpler create_flow_run
task was introduced to create a child flow run. This task always returns a flow run id immediately after creation. We then added a get_task_run_result(flow_run_id: str, task_slug: str)
task which will retrieve the result from the task in the given flow run.
For example, we can pull a result from a child flow into a parent flow then act on that data:
from typing import List
from prefect import Flow, Parameter, task
from prefect.engine.results import LocalResult
from prefect.tasks.prefect.flow_run import create_flow_run, get_task_run_result
@task(result=LocalResult())
def create_some_data(length: int):
return list(range(length))
with Flow("child") as child_flow:
data_size = Parameter("data_size", default=5)
data = create_some_data(data_size)
@task(log_stdout=True)
def transform_and_show(data: List[int]) -> List[int]:
print(f"Got: {data!r}")
new_data = [x + 1 for x in data]
print(f"Created: {new_data!r}")
return new_data
with Flow("parent") as parent_flow:
child_run_id = create_flow_run(
flow_name=child_flow.name, parameters=dict(data_size=10)
)
child_data = get_task_run_result(child_run_id, "create_some_data-1")
transform_and_show(child_data)
To run this example, save it to a file parent-child-flow.py
. Then we can register and run the parent
$ prefect register -p parent-child-flow.py --project example
$ prefect run --name "parent" --execute
❯ prefect run --name 'parent' --execute
Looking up flow metadata... Done
Creating run for flow 'parent'... Done
└── Name: quixotic-orangutan
└── UUID: 8b17ffed-c70e-4a20-a3b4-efb2eebba9e9
└── Labels: ['agentless-run-2e92a4fd']
└── Parameters: {}
└── Context: {}
└── URL: https://cloud.prefect.io/prefect-engineering/flow-run/8b17ffed-c70e-4a20-a3b4-efb2eebba9e9
Executing flow run...
└── 13:19:07 | INFO | Creating subprocess to execute flow run...
└── 13:19:08 | INFO | Beginning Flow run for 'parent'
└── 13:19:09 | INFO | Task 'create_flow_run': Starting task run...
└── 13:19:10 | INFO | Creating flow run 'quixotic-orangutan-child' for flow 'child'...
└── 13:19:10 | INFO | Created flow run 'quixotic-orangutan-child': https://cloud.prefect.io/prefect-engineering/flow-run/5eed1147-9247-48da-86e0-c7cee995cea9
└── 13:19:11 | INFO | Task 'create_flow_run': Finished task run for task with final state: 'Success'
└── 13:19:11 | INFO | Task 'get_task_run_result': Starting task run...
└── 13:19:22 | INFO | Task 'get_task_run_result': Finished task run for task with final state: 'Success'
└── 13:19:23 | INFO | Task 'transform_and_show': Starting task run...
└── 13:19:24 | INFO | Got: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
└── 13:19:24 | INFO | Created: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
└── 13:19:24 | INFO | Task 'transform_and_show': Finished task run for task with final state: 'Success'
└── 13:19:25 | INFO | Flow run SUCCESS: all reference tasks succeeded
Flow run succeeded!
Inspection of flow runs
The UI provides a wonderful view into your flow runs with minimal effort, we want to bring our Python library in-line with this ease. 0.15.0 creates a prefect.backend
module with Python objects that provide views of the backend without requiring you to write any GraphQL queries. GraphQL is a powerful view into the Prefect backend, allowing you to retrieve exactly the data you want from nested objects, but often you just want to pull a typical set of data for a single object. Since flow runs are at the center of what Prefect does, we've focused on making it simple to pull useful data for a flow run.
The FlowRunView
lets you pull the most recent information about a flow run from the backend. For example:
from prefect.backend import FlowRunView
# Create a new instance using an ID from the UI or from the output of the `prefect run` command
flow_run = FlowRunView.from_flow_run_id("4c0101af-c6bb-4b96-8661-63a5bbfb5596")
# You now have access to information about the flow run
flow_run.state # <Success: "All reference tasks succeeded.">
flow_run.labels # {"foo"}
flow_run.parameters # {"name": "Marvin"}
flow_run.run_config # LocalRun(...)
flow_run.states # [Scheduled(), Submitted(), Running(), Success()]
# You can also retrieve information about the flow
flow = flow_run.get_flow_metadata() # FlowView(...)
# Or about a task run
task_run = flow_run.get_task_run(task_slug='say_hello-1') # TaskRunView(...)
Notice that the FlowRunView
provides methods to access two related structures: FlowView
and TaskRunView
. FlowView
exposes the metadata that we store about a flow when it is registered. TaskRunView
exposes metadata for a single run of a task in your flow.
A FlowView
can also be directly instantiated with various lookup methods:
from prefect.backend import FlowView
flow_view = FlowView.from_flow_name(flow_name="hello-world")
# If a name is not unique, a project will need to be provided
flow_view = FlowView.from_flow_name(flow_name="hello-world", project_name="example")
# Or a lookup can be performed by id
flow_view = FlowView.from_flow_id(flow_id="8a896c14-07ac-4538-bed1-162e188d780f")
# The view provides information about the flow
flow_view.run_config # LocalRun(...)
flow_view.storage # Module(...)
flow.core_version # 0.15.0
flow.project_name # "example"
A TaskRunView
can also be instantiated directly, but it requires a flow run id so we recommend using FlowRunView().get_task_run(...)
:
from prefect import task
from prefect.backend import TaskRunView
# Presume we have a flow with the following task
@task
def foo():
return "foobar!"
# Instantiate a task run directly
task_run = TaskRunView.from_task_slug("foo-1", flow_run_id="<id>")
# As with the other views, we can inspect the task run
task_run.state # Success(...)
task_run.map_index # -1 (not mapped)
# We can also retrieve results
task_run.get_result() # "foobar!"
The most powerful feature of the TaskRunView
is its ability to pull results from their location on your infrastructure. This object does the heavy lifting of looking up the location of the result and pulling the data from it. It also handles mapped tasks which require the result to be pulled from many locations. Check out the task run view documentation for more details.
These backend objects were designed on conjunction with all of the features described above. We've leveraged their simple interface to build new, powerful features with minimal effort. Without these abstractions, the code for our new features would have a lot of repetition and complexity. We're excited to see what you can build with them as well!
API keys for simple auth
The authentication paradigm in Prefect used to be pretty complicated. You had to generate tokens with the proper scope then use one token to register your flows and another to run your flows. Behind the scenes, the token you provided was generating short-lived tokens that were actually used for authentication, leading to a complex dance in the Prefect Client
object. API keys solve all of these problems. An API key is tied directly to an account and has all of the permissions of that account. It can be used for both registering and running flows. It does not require additional token generation. We've added a whole new suite of CLI commands for managing authentication with API keys while retaining backwards compatibility with tokens.
Getting started with an API key
If you've never used a token, getting going with an API key is simple. Head to the UI API key page and create a new key. Then open up a terminal and run prefect auth login --key "<YOUR-KEY>"
Congrats! You're logged in. Your key has now been saved to ~/.prefect/auth.toml
and will be used for all future operations. Try starting an agent with prefect agent local start
.
If your user belongs to multiple tenants, you can view them with prefect auth list-tenants
and switch tenants with prefect auth switch-tenants
. When you switch tenants, the auth.toml
file will be updated so all future Cloud interactions use the new tenant.
If you want to remove your key from your machine, run prefect auth logout
.
Transitioning from a token to an API key
If you're using a token right now, we encourage you to switch over! Run prefect auth logout
to remove your 'short-lived' token then run prefect auth logout
again to remove your API token from your machine entirely. We care a lot about backwards compatibility and didn't want to break the old behavior of the logout command for tokens, which is why it takes two goes to remove it entirely.
You may get some warnings that your API token has been set elsewhere when you try using the CLI to logout. This means you've set your auth token in the config.toml
at prefect.cloud.auth_token
or as an environment variable PREFECT__CLOUD__AUTH_TOKEN
. Just remove that config setting and you're logged out.
Since API keys can be used to run agents, you can also remove prefect.cloud.agent.auth_token
or PREFECT__CLOUD__AGENT__AUTH_TOKEN
as well. You won't be needing them anymore.
Now that you've purged your token, you can login with an API key as described above.
Service accounts
When running agents in production or registering flows in CI, it doesn't make sense to tie API interactions to your specific user account. For this, we've introduced service accounts which are scoped to a single tenant. You can create service accounts and an associated API key from the UI.
Then, in your CI job you can set an environment variable PREFECT__CLOUD__API_KEY="<API-KEY>"
or use the login CLI as described above.
In conclusion
We've rehauled the Prefect Core Python API for running and inspecting flow runs and exposed some powerful new patterns. In the process, we rewrote most of the flow run documentation. We've also simplified authentication while providing more powerful access control. We can't fit everything in a blog post, so be sure to check out the release changelog to see everything else.
. . .
We're excited to see what you can do with these new features and we're always looking for more feedback so we can continue to make the best orchestration tool around!
Please continue reaching out to us — we appreciate the opportunity to work with all of you!
- join our Slack community for ad-hoc questions
- follow us on Twitter for updates
- attend our meetup events for contributors, focused on the internals of Prefect
- visit us on GitHub to open issues and pull requests
Happy Engineering!
— The Prefect Team
Awesome work!👏