Skip to content

Instantly share code, notes, and snippets.

@anna-geller
Created December 10, 2021 15:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anna-geller/402592fa014600867461fda217d420ba to your computer and use it in GitHub Desktop.
Save anna-geller/402592fa014600867461fda217d420ba to your computer and use it in GitHub Desktop.
from datetime import timedelta
from prefect import task, Flow
from prefect.schedules.clocks import IntervalClock
from prefect.schedules import Schedule
import prefect
from prefect.client import Client
from prefect.engine.state import Failed
schedule = Schedule(clocks=[IntervalClock(timedelta(minutes=5))])
def pause_schedule_if_last_flow_run_failed(obj, old_state, new_state):
if new_state.is_failed():
logger = prefect.context.get("logger")
message = "Failing this flow run and deactivating a schedule since the last flow run ended in Failure..."
logger.info(message)
client = Client()
response = client.graphql(
"""
mutation($flow_id: UUID!) {
set_schedule_inactive(input: {flow_id: $flow_id}) {
success
error
}
}""",
variables=prefect.context.flow_id,
raise_on_error=True,
)
logger.info(response)
return Failed(message)
return new_state
@task(log_stdout=True)
def hello_world():
print("hello world")
with Flow(
"hello", schedule=schedule, state_handlers=[pause_schedule_if_last_flow_run_failed]
) as flow:
hw = hello_world()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment