Skip to content

Instantly share code, notes, and snippets.

@pingzh
Last active January 6, 2022 19:32
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save pingzh/e849084934588a20ce3c440598669fd8 to your computer and use it in GitHub Desktop.
Save pingzh/e849084934588a20ce3c440598669fd8 to your computer and use it in GitHub Desktop.
for airflow perf test for ti creation inside the dag_run verify_integrity. The test is against a database without other traffic
import time
import logging
from airflow.utils.db import create_session
from airflow.utils import timezone
from airflow.models import TaskInstance
from airflow.models.serialized_dag import SerializedDagModel
logger = logging.getLogger(__name__)
out_hdlr = logging.FileHandler('./log.txt')
out_hdlr.setFormatter(logging.Formatter('%(asctime)s %(message)s'))
out_hdlr.setLevel(logging.INFO)
logger.addHandler(out_hdlr)
logger.setLevel(logging.INFO)
def create_tis_in_new_dag_run(dag, execution_date, number_of_tis):
tasks = list(dag.task_dict.values())[0:number_of_tis]
t1 = time.time()
success = True
try:
with create_session() as session:
for task in :
ti = TaskInstance(task, execution_date)
session.add(ti)
session.flush()
except:
success = False
t2 = time.time()
logger.info('Created %s tis. success?: %s, perf: %s', number_of_tis, success, t2 - t1)
return t2 - t1, success
def perf_tis_creation(dag):
perf = {}
for number_of_tis in [1000, 3000, 5000, 10000, 15000, 20000, 25000]:
next_run_date = timezone.utcnow()
perf, success = create_tis_in_new_dag_run(dag, next_run_date, number_of_tis)
perf[number_of_tis] = (perf, success)
time.sleep(5)
if __name__ == '__main__':
dag_id = 'a-very-large-dag'
dm = SerializedDagModel.get(dag_id)
dag = dm.dag
perf_tis_creation(dag)
@ashb
Copy link

ashb commented Jan 6, 2022

Yeah I was surprised by just how much it sped up!

@potiuk
Copy link

potiuk commented Jan 6, 2022

Good stuff @ashb ! I remember times when I got even better results by just batchig DB writes (Telecom Billing process). I think there are only two reason whu you should NOT do batching:

  1. When you care about transactional integrity "per object"
  2. When you have risk of deadlocks

I think in this case we have neither the 1) need nor 2) risk.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment