Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Retries when processing files in Prefect 2.0
import os
import random
import requests
from prefect import task, flow
from prefect.orion.schemas.states import Completed, Failed
from prefect.futures import PrefectFuture
from typing import List
@task(retries=3, retry_delay_seconds=2)
def process_file(file_name: str):
return random.choice(
[
Completed(message=f"File {file_name} processed successfully ✅"),
Failed(message=f"File {file_name} failed ❌"),
]
)
def send_slack_alert_on_failure(task_future: PrefectFuture, file_name: str):
task_future.wait()
if task_future.get_state().is_failed():
name_ = task_future.task_run.name
id_ = task_future.task_run.flow_run_id
requests.post(
os.environ["SLACK_WEBHOOK_URL"],
json={
"text": f"File {file_name} failed ❌ in task `{name_}` in a flow run `{id_}`."
},
)
@flow
def process_files(files_to_process: List[str]):
for file in files_to_process:
file_state = process_file(file)
send_slack_alert_on_failure(file_state, file)
if __name__ == "__main__":
files_param = [f"file_{i}" for i in range(1, 20)]
process_files(files_param)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment