On multi-scheduler Airflow deployments (e.g. AWS MWAA with 4+ schedulers), tasks intermittently fail in bulk with no apparent cause, but succeed on manual retry. The root issue is in _executable_task_instances_to_queued: when the scheduler checks task concurrency limits and cannot find the serialized DAG, it immediately bulk-fails all SCHEDULED task instances for that DAG via a raw SQL UPDATE. This is an overly aggressive response to what is often a transient race condition during DAG file parsing or serialization refresh cycles.
In scheduler_job_runner.py, the _executable_task_instances_to_queued method loads the serialized DAG only when checking per-task or per-dagrun concurrency limits. If scheduler_dag_bag.get_dag_for_run() returns None (serialized DAG transiently absent), the code executed a bulk UPDATE task_instance SET state='failed' for all SCHEDULED tasks of that DAG — instead of treating it as a transient miss.