Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
import prefect
from prefect import task, Flow
from prefect.client import Client
from prefect.engine.state import Skipped
import time
@task(log_stdout=True)
def hello_world():
print("Sleeping...")
time.sleep(360)
def skip_if_running_handler(obj, old_state, new_state):
if new_state.is_running():
client = Client()
query = """
query($flow_id: uuid) {
flow_run(
where: {_and: [{flow_id: {_eq: $flow_id}},
{state: {_eq: "Running"}}]}
) {
name
state
start_time
}
}
"""
response = client.graphql(
query=query, variables=dict(flow_id=prefect.context.flow_id)
)
active_flow_runs = response["data"]["flow_run"]
if active_flow_runs:
logger = prefect.context.get("logger")
message = "Skipping this flow run since there are already some flow runs in progress"
logger.info(message)
return Skipped(message)
return new_state
with Flow("skip_if_running", state_handlers=[skip_if_running_handler]) as flow:
hello_task = hello_world()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment