Skip to content

Instantly share code, notes, and snippets.

@pspeter
Created May 16, 2022 11:57
Show Gist options
  • Save pspeter/13c18ee6cf247a5ee2b3e3b2ee8a5470 to your computer and use it in GitHub Desktop.
Save pspeter/13c18ee6cf247a5ee2b3e3b2ee8a5470 to your computer and use it in GitHub Desktop.
Migrate airflow logs from v2.2.x to v2.3.x folder structure
import pandas as pd
from pathlib import Path
from airflow.settings import Session
log_dir = Path("/opt/airflow/logs")
session = Session()
tis_iter = session.execute("SELECT * FROM task_instance;")
tis = pd.DataFrame(tis_iter, columns=["task_id", "dag_id", "date1", "date2", "num0", "status", "num1", "hash", "user", "num2", "pool", "str1", "num3", "operator", "date3", "num4", "num5", "mem_addr", "num6", "num7", "longid", "None1", "None2", "None3", "None4", "scheduled_task_id", "num8"])
def get_run_id(dag_id, task_id, ts):
tis[tis["dag_id"].eq(dag_id) & tis["task_id"].eq(task_id) & tis["ts"].eq(ts)]["scheduled_task_id"].iloc[0]
def run():
for p1 in (a for a in log_dir.iterdir() if not a.name.startswith("dag_id=") and a.name not in ("scheduler", "dag_processor_manager")):
for p2 in p1.iterdir():
for p3 in p2.iterdir():
for f in p3.iterdir():
attempt = f.name
ts = f.parent.name
task_id = f.parent.parent.name
dag_id = f.parent.parent.parent.name
try:
run_id = get_run_id(dag_id, task_id, ts)
except:
continue
target_dir = log_dir / f"dag_id={dag_id}" / f"run_id={run_id}" / f"task_id={task_id}"
target_dir.mkdir(parents=True, exist_ok=True)
target_file = target_dir / f"attempt={attempt}"
f.rename(target_file)
run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment