Skip to content

Instantly share code, notes, and snippets.

@BitTheByte
Last active March 5, 2022 06:22
Show Gist options
  • Save BitTheByte/cf7af4c0247ff6568d9298bc30270524 to your computer and use it in GitHub Desktop.
Save BitTheByte/cf7af4c0247ff6568d9298bc30270524 to your computer and use it in GitHub Desktop.
import hashlib
import prefect
def cache_namespace(enable_cache=True, verbose=False, shared_caching=False):
# TODO: Pray all Gods caching doesn't cause problems later
def creator(
flow_id,
flow_run_id,
flow_name,
flow_run_name,
task_name,
task_run_id,
logger,
task_id,
**kwargs,
):
for i in [
"config",
"date",
"today",
"tomorrow",
"yesterday",
"today_nodash",
"tomorrow_nodash",
"yesterday_nodash",
"scheduled_start_time",
"task_run_version",
"task_run_count",
"checkpointing",
"flow_run_version",
"task_full_name",
"task_tags",
"map_index",
"task_slug",
"caches",
]:
kwargs.pop(i, None)
default_key = f"{flow_name}/{flow_run_name}.{flow_run_id}/{task_name}.{flow_id}.{task_run_id}.prefect_result"
if not enable_cache:
return default_key
try:
cache_keys = kwargs | {
"extras": [
flow_id,
flow_run_name,
task_name,
task_id if not shared_caching else None,
]
}
unique_hash = hashlib.md5(str(cache_keys).encode()).hexdigest()
namespace = f"{flow_name}/{flow_run_name}.{flow_run_id}/{task_name}.{unique_hash}.prefect_result"
if verbose:
logger.info(f"namespace: {namespace}, cache_keys: {cache_keys}")
return namespace
except Exception as e:
return default_key
return creator
def task(**kwargs):
def inner(func):
class CacheableFunctionTask(prefect.tasks.core.function.FunctionTask):
def __init__(self, **kwargs):
super().__init__(
**(
{
"max_retries": 3,
"retry_delay": 60,
"target": cache_namespace(),
}
| kwargs
)
)
return CacheableFunctionTask(fn=func, **kwargs)
return inner
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment