Created
June 13, 2024 21:59
-
-
Save dstandish/383b1f89e6c8e9165ddbb8a381b4cc57 to your computer and use it in GitHub Desktop.
task instance data generator
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Airflow task instance data generator. | |
Takes two arguments: | |
* unit: days / weeks / hours / minutes etc | |
* num: just to do the same thing with different name dag | |
Example: | |
python /Users/dstandish/code/async-ssh-operator/synthesize_data.py days 1 | |
* This will create one dag run per day for the time period. (hardcoded at 2.5 yrs) | |
* that means: ~18k task runs | |
* dag will be named fake_dag_days_1 | |
python /Users/dstandish/code/async-ssh-operator/synthesize_data.py minutes 2 | |
* This will create one dag run per day for the time period. (hardcoded at ~2.5 yrs) | |
* that means: ~27M task runs | |
* dag will be named fake_dag_minutes_2 | |
Tasks per dag hardcoded to 20 | |
Creates tasks in parallel in a process pool with workers hardcoded to 10. | |
Make sure you have some ice to dump on your laptop. | |
""" | |
from __future__ import annotations | |
import os | |
os.environ["AIRFLOW__DATABASE__SQL_ALCHEMY_CONN"] = "postgresql://airflow:Abc123456@localhost/testing" | |
import sys | |
from concurrent.futures import ProcessPoolExecutor | |
import pendulum | |
from airflow import DAG | |
from airflow.models import DagRun | |
from airflow.operators.bash import BashOperator | |
unit = sys.argv[1] | |
num = int(sys.argv[2]) | |
print(f"starting data generation with {unit=} and {num=}") | |
with DAG(dag_id=f"fake_dag_{unit}_{num}") as dag1: | |
for tnum in range(20): | |
start_task = BashOperator( | |
task_id=f"task_{tnum}", | |
bash_command="hi", # enclosed in single quotes to avoid "template" rendering | |
) | |
if __name__ == "__main__": | |
dag1.bulk_sync_to_db([dag1]) | |
start_date = pendulum.datetime(2022, 1, 1, tz="UTC") | |
end_date = pendulum.datetime(2024, 6, 13, tz="UTC") | |
def get_dates(): | |
period = end_date - start_date | |
for dt in period.range(unit): | |
if dt == end_date: | |
break | |
yield dt | |
def run(dt): | |
from airflow.utils.session import create_session | |
with create_session() as session: | |
create_tis(dt=dt, session=session) | |
session.commit() | |
if dt.minute == 0 and dt.hour == 0: | |
print(dt) | |
def create_tis(dt, session): | |
dr: DagRun = dag1.create_dagrun(state="success", execution_date=dt, run_type="manual", session=session) | |
for ti in dr.task_instances: | |
ti.start_date = dt | |
ti.end_date = dt.add(seconds=30) | |
if __name__ == "__main__": | |
with ProcessPoolExecutor(max_workers=10) as pool: | |
pool.map(run, get_dates()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment