Skip to content

Instantly share code, notes, and snippets.

@dstandish
Created June 13, 2024 21:59
Show Gist options
  • Save dstandish/383b1f89e6c8e9165ddbb8a381b4cc57 to your computer and use it in GitHub Desktop.
Save dstandish/383b1f89e6c8e9165ddbb8a381b4cc57 to your computer and use it in GitHub Desktop.
task instance data generator
"""
Airflow task instance data generator.
Takes two arguments:
* unit: days / weeks / hours / minutes etc
* num: just to do the same thing with different name dag
Example:
python /Users/dstandish/code/async-ssh-operator/synthesize_data.py days 1
* This will create one dag run per day for the time period. (hardcoded at 2.5 yrs)
* that means: ~18k task runs
* dag will be named fake_dag_days_1
python /Users/dstandish/code/async-ssh-operator/synthesize_data.py minutes 2
* This will create one dag run per day for the time period. (hardcoded at ~2.5 yrs)
* that means: ~27M task runs
* dag will be named fake_dag_minutes_2
Tasks per dag hardcoded to 20
Creates tasks in parallel in a process pool with workers hardcoded to 10.
Make sure you have some ice to dump on your laptop.
"""
from __future__ import annotations
import os
os.environ["AIRFLOW__DATABASE__SQL_ALCHEMY_CONN"] = "postgresql://airflow:Abc123456@localhost/testing"
import sys
from concurrent.futures import ProcessPoolExecutor
import pendulum
from airflow import DAG
from airflow.models import DagRun
from airflow.operators.bash import BashOperator
unit = sys.argv[1]
num = int(sys.argv[2])
print(f"starting data generation with {unit=} and {num=}")
with DAG(dag_id=f"fake_dag_{unit}_{num}") as dag1:
for tnum in range(20):
start_task = BashOperator(
task_id=f"task_{tnum}",
bash_command="hi", # enclosed in single quotes to avoid "template" rendering
)
if __name__ == "__main__":
dag1.bulk_sync_to_db([dag1])
start_date = pendulum.datetime(2022, 1, 1, tz="UTC")
end_date = pendulum.datetime(2024, 6, 13, tz="UTC")
def get_dates():
period = end_date - start_date
for dt in period.range(unit):
if dt == end_date:
break
yield dt
def run(dt):
from airflow.utils.session import create_session
with create_session() as session:
create_tis(dt=dt, session=session)
session.commit()
if dt.minute == 0 and dt.hour == 0:
print(dt)
def create_tis(dt, session):
dr: DagRun = dag1.create_dagrun(state="success", execution_date=dt, run_type="manual", session=session)
for ti in dr.task_instances:
ti.start_date = dt
ti.end_date = dt.add(seconds=30)
if __name__ == "__main__":
with ProcessPoolExecutor(max_workers=10) as pool:
pool.map(run, get_dates())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment