|
import os |
|
import re |
|
import shutil |
|
from typing import Dict, Tuple |
|
|
|
import pytest |
|
from _pytest.config import Config |
|
from _pytest.config.argparsing import Parser |
|
|
|
|
|
AIRFLOW_ENVS_PATTERN = re.compile(r"^airflow_.*$", re.IGNORECASE) |
|
DEFAULT_AIRFLOW_ENVS: Dict[str, Tuple[str, bool]] = { |
|
"AIRFLOW__CORE__EXECUTOR": ("DebugExecutor", True), |
|
"AIRFLOW__CORE__UNIT_TEST_MODE": ("True", True), |
|
"AIRFLOW__DEBUG__FAIL_FAST": ("True", True), |
|
} |
|
|
|
|
|
def pytest_addoption(parser: Parser): |
|
"""Add section to configuration files.""" |
|
|
|
parser.addini( |
|
"airflow_envs", |
|
type="linelist", |
|
help="List of Apache Airflow Environment Variables (name=value)", |
|
default=[] |
|
) |
|
|
|
parser.addini( |
|
"airflow_home", |
|
type="string", |
|
help="Path to Apache Airflow Home directory", |
|
default="./.airflow_home" |
|
) |
|
|
|
parser.addini( |
|
"airflow_dags", |
|
type="string", |
|
help="Path to Apache Airflow Dags directory", |
|
default="./dags" |
|
) |
|
|
|
|
|
def _validate_directory(dir_path): |
|
if not os.path.exists(dir_path): |
|
raise FileNotFoundError(dir_path) |
|
elif not os.path.isdir(dir_path): |
|
raise NotADirectoryError(dir_path) |
|
|
|
|
|
def _make_directory_path(base_path, root_dir, create=False): |
|
base_path = os.path.expanduser(base_path) |
|
base_path = os.path.expandvars(base_path) |
|
|
|
if not os.path.isabs(base_path): |
|
base_path = os.path.abspath(os.path.join(root_dir, base_path)) |
|
|
|
try: |
|
_validate_directory(base_path) |
|
except FileNotFoundError: |
|
if not create: |
|
raise |
|
os.makedirs(base_path) |
|
|
|
return base_path |
|
|
|
|
|
def init_airflow_db(airflow_home: str, unittests_mode: bool = True): |
|
# Avoid import Apache Airflow before setup variables |
|
from airflow.utils.db import upgradedb |
|
|
|
if not unittests_mode: |
|
upgradedb() |
|
return |
|
|
|
# Remove files related to unittests Database and configuration |
|
unittest_db = os.path.join(airflow_home, "unittests.db") |
|
try: |
|
os.remove(unittest_db) |
|
except FileNotFoundError: |
|
pass |
|
|
|
# Check if unittest SQLite database exists and restore it |
|
# Otherwise upgrade (initialize) an make copy if possible |
|
# It will increase startup for next test execution |
|
pre_init_db = os.path.join(airflow_home, "unittests.db.init") |
|
if os.path.exists(pre_init_db): |
|
shutil.copy(pre_init_db, unittest_db) |
|
upgradedb() |
|
else: |
|
upgradedb() |
|
try: |
|
shutil.copy(unittest_db, pre_init_db) |
|
except FileNotFoundError: |
|
pass |
|
|
|
return True |
|
|
|
|
|
@pytest.hookimpl(tryfirst=True) |
|
def pytest_load_initial_conftests(args: list, early_config: Config, parser: Parser): |
|
env_vars = DEFAULT_AIRFLOW_ENVS.copy() |
|
|
|
for raw_env in early_config.getini("airflow_envs"): |
|
part = raw_env.partition("=") |
|
key = part[0].strip() |
|
value = part[2].strip() |
|
|
|
default = False |
|
if key.startswith("D:"): |
|
key = key[2:] |
|
default = True |
|
|
|
if AIRFLOW_ENVS_PATTERN.match(key): |
|
# Upper case for AIRFLOW variables |
|
key = key.upper() |
|
|
|
env_vars[key] = (value, default, ) |
|
|
|
if "AIRFLOW__CORE__DAGS_FOLDER" not in os.environ: |
|
if "AIRFLOW__CORE__DAGS_FOLDER" not in env_vars: |
|
airflow_dags = _make_directory_path( |
|
early_config.getini("airflow_dags"), |
|
early_config.rootpath |
|
) |
|
env_vars["AIRFLOW__CORE__DAGS_FOLDER"] = (airflow_dags, False, ) |
|
else: |
|
_validate_directory(env_vars["AIRFLOW__CORE__DAGS_FOLDER"]) |
|
|
|
if "AIRFLOW_HOME" not in env_vars and "AIRFLOW_HOME" not in os.environ: |
|
airflow_home = _make_directory_path( |
|
early_config.getini("airflow_home"), |
|
early_config.rootpath, |
|
create=True |
|
) |
|
env_vars["AIRFLOW_HOME"] = (airflow_home, True, ) |
|
|
|
for key, value_tuple in env_vars.items(): |
|
value, default = value_tuple |
|
|
|
if not default or key not in os.environ: |
|
os.environ[key] = value |
|
|
|
unittests_mode = os.environ.get("AIRFLOW__CORE__UNIT_TEST_MODE", False) |
|
if isinstance(unittests_mode, str): |
|
unittests_mode = bool(unittests_mode.capitalize()) |
|
|
|
init_airflow_db( |
|
os.environ["AIRFLOW_HOME"], |
|
unittests_mode=unittests_mode |
|
) |