Retries when processing files in Prefect 2.0
import os
import random
import requests
from prefect import task, flow
from prefect.orion.schemas.states import Completed, Failed
from prefect.futures import PrefectFuture
from typing import List
@task(retries=3, retry_delay_seconds=2)
def process_file(file_name: str):
return random.choice(
Completed(message=f"File {file_name} processed successfully ✅"),
Failed(message=f"File {file_name} failed ❌"),
def send_slack_alert_on_failure(task_future: PrefectFuture, file_name: str):
if task_future.get_state().is_failed():
name_ =
id_ = task_future.task_run.flow_run_id
"text": f"File {file_name} failed ❌ in task `{name_}` in a flow run `{id_}`."
def process_files(files_to_process: List[str]):
for file in files_to_process:
file_state = process_file(file)
send_slack_alert_on_failure(file_state, file)
if __name__ == "__main__":
files_param = [f"file_{i}" for i in range(1, 20)]
