Skip to content

Instantly share code, notes, and snippets.

@asandeep
Created October 27, 2020 02:57
Show Gist options
  • Save asandeep/bc48f2b3af1f12259ba55d2af5ff5fca to your computer and use it in GitHub Desktop.
Save asandeep/bc48f2b3af1f12259ba55d2af5ff5fca to your computer and use it in GitHub Desktop.
Flow to cleanup old prefect flow runs.
import pendulum
import prefect
from dynaconf import settings
from prefect import schedules
from prefect.schedules import clocks
# Cron schedule to execute cleanup job. Currently set to run at 00:00 (UTC)
# every Sunday.
DATA_CLEANUP_SCHEDULE_CRON_STRING = "0 0 * * 0"
@prefect.task
def get_expired_flow_runs():
"""Returns list of flow run ID that are expired as per retention policy."""
logger = prefect.context.get("logger")
query = {
"query($updated_before: timestamptz)": {
"""
flow_run(where: {
_and: {
state: {_neq: "Scheduled"},
updated: {_lt: $updated_before}
}
})
""": {
"id"
}
}
}
retention_period = (
pendulum.now("UTC")
.subtract(days=settings.RETENTION_DAYS)
.start_of("day")
.to_datetime_string()
)
client = prefect.Client(api_server=settings.PREFECT_API_ENDPOINT)
flow_runs = client.graphql(
query, variables={"updated_before": retention_period}
).data.flow_run
logger.info("Expired flow run count: %s", len(flow_runs))
expired_flow_run_ids = [flow_run.id for flow_run in flow_runs]
logger.debug("Expired flow run ids: %s", expired_flow_run_ids)
return expired_flow_run_ids
@prefect.task
def delete_flow_run(flow_run_id):
"""
Hits a mutation using Prefect client to delete flow run associated with
given ID.
As per confirmation in this slack thread:
https://prefect-community.slack.com/?redir=%2Farchives%2FCL09KU1K7%2Fp1598535130019500,
it should be sufficient to delete just the flow run and database cascades
should take care of all the related objects.
Args:
flow_run_id: ID of flow run to be deleted.
"""
logger = prefect.context.get("logger")
mutation = """
mutation($input: delete_flow_run_input!) {
delete_flow_run(input: $input) {
success
}
}
"""
client = prefect.Client(api_server=settings.PREFECT_API_ENDPOINT)
response = client.graphql(
mutation, variables=dict(input=dict(flow_run_id=flow_run_id))
)
logger.debug("Flow id: %s deletion success: %s", flow_run_id, response.data)
with prefect.Flow(
"cleanup_expired_data",
schedule=schedules.Schedule(
clocks=[clocks.CronClock(DATA_CLEANUP_SCHEDULE_CRON_STRING)]
),
) as cleanup_expired_data:
# Prefect API currently doesn't support bulk deletion of flow runs. So, for
# now we do it one by one until below bug is resolved.
# @see: https://github.com/PrefectHQ/server/issues/62
expired_flow_runs = get_expired_flow_runs()
delete_flow_run.map(expired_flow_runs)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment