Skip to content

Instantly share code, notes, and snippets.

@anna-geller
Created December 7, 2021 15:13
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anna-geller/8cf4b56a22e3944864dd1720673ce5c8 to your computer and use it in GitHub Desktop.
Save anna-geller/8cf4b56a22e3944864dd1720673ce5c8 to your computer and use it in GitHub Desktop.
import prefect
from prefect import task, Flow
from prefect.client import Client
from prefect.engine.state import Skipped
import time
@task(log_stdout=True)
def hello_world():
print("Sleeping...")
time.sleep(360)
def skip_if_running_handler(obj, old_state, new_state):
if new_state.is_running():
client = Client()
query = """
query($flow_id: uuid) {
flow_run(
where: {_and: [{flow_id: {_eq: $flow_id}},
{state: {_eq: "Running"}}]}
) {
name
state
start_time
}
}
"""
response = client.graphql(
query=query, variables=dict(flow_id=prefect.context.flow_id)
)
active_flow_runs = response["data"]["flow_run"]
if active_flow_runs:
logger = prefect.context.get("logger")
message = "Skipping this flow run since there are already some flow runs in progress"
logger.info(message)
return Skipped(message)
return new_state
with Flow("skip_if_running", state_handlers=[skip_if_running_handler]) as flow:
hello_task = hello_world()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment