Skip to content

Instantly share code, notes, and snippets.

@anna-geller
Created January 5, 2022 13:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anna-geller/69a38677b8ab56c2da2d80eeea9ea1c4 to your computer and use it in GitHub Desktop.
Save anna-geller/69a38677b8ab56c2da2d80eeea9ea1c4 to your computer and use it in GitHub Desktop.
import prefect
from prefect import task, Flow
from prefect.tasks.notifications import SlackTask
import time
from typing import cast
def post_to_slack_on_cancellation(flow, old_state, new_state):
if isinstance(new_state, prefect.engine.state.Cancelled):
if isinstance(new_state.result, Exception):
value = "```{}```".format(repr(new_state.result))
else:
value = cast(str, new_state.message)
msg = (
f"The flow `{prefect.context.flow_name}` was cancelled "
f"in a flow run `{prefect.context.flow_run_id}` "
f"with a message: `{value}`"
)
# do something here to cleanup resources when flow was cancelled
SlackTask(message=msg).run()
return new_state
@task(log_stdout=True)
def hello_world():
print("Doing something and giving you 30 seconds to cancel the flow run from the UI...")
time.sleep(30)
with Flow("canc", state_handlers=[post_to_slack_on_cancellation]) as flow:
hw = hello_world()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment