Skip to content

Instantly share code, notes, and snippets.

@anna-geller
Created July 13, 2022 13:31
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anna-geller/c19f05e0549556f5ec79312d6cb08da5 to your computer and use it in GitHub Desktop.
Save anna-geller/c19f05e0549556f5ec79312d6cb08da5 to your computer and use it in GitHub Desktop.
Retries when processing files in Prefect 2.0
import os
import random
import requests
from prefect import task, flow
from prefect.orion.schemas.states import Completed, Failed
from prefect.futures import PrefectFuture
from typing import List
@task(retries=3, retry_delay_seconds=2)
def process_file(file_name: str):
return random.choice(
[
Completed(message=f"File {file_name} processed successfully ✅"),
Failed(message=f"File {file_name} failed ❌"),
]
)
def send_slack_alert_on_failure(task_future: PrefectFuture, file_name: str):
task_future.wait()
if task_future.get_state().is_failed():
name_ = task_future.task_run.name
id_ = task_future.task_run.flow_run_id
requests.post(
os.environ["SLACK_WEBHOOK_URL"],
json={
"text": f"File {file_name} failed ❌ in task `{name_}` in a flow run `{id_}`."
},
)
@flow
def process_files(files_to_process: List[str]):
for file in files_to_process:
file_state = process_file(file)
send_slack_alert_on_failure(file_state, file)
if __name__ == "__main__":
files_param = [f"file_{i}" for i in range(1, 20)]
process_files(files_param)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment