Skip to content

Instantly share code, notes, and snippets.

@Taragolis
Created September 10, 2021 08:24
Show Gist options
  • Save Taragolis/766c699f3f9a0065765bfedf5eacc633 to your computer and use it in GitHub Desktop.
Save Taragolis/766c699f3f9a0065765bfedf5eacc633 to your computer and use it in GitHub Desktop.

Custom pytest plugins

Apache Airflow pytest plugin

This plugin help set up Apache Airflow unit test environment

Plugin capability

  • Setup required and optional configurations by Environment Variables before import any of Apache Airflow module
  • Initialise unit test database (SQL Lite)
  • Create backup of unit test database (SQL Lite) for speed up next test executions
  • Plugin allow passthroughs already defined Environment Variables if requires

Plugin pytest ini extensions

  • airflow_home - Absolute or relative path to Apache Airflow Home directory. By Default .airflow_home in project root directory
  • airflow_dags - Absolute or relative path to Apache Airflow DAGs directory. By Default dags in project root directory
  • airflow_envs - List of environment variables which defined before import any airflow modules. You can use D: (default) as prefix if you don't want to override existing environment variables. Same behaviour in pytest-env plugin

Samples

pytest.ini

[pytest]
airflow_home = ~/airflow/
airflow_dags = ./dags/
airflow_envs =
	D:AIRFLOW__CORE__DEFAULT_TIMEZONE=utc
	AIRFLOW__CORE__PLUGINS_FOLDER=/home/awesomeuser/plugins/

setup.cfg

[tool:pytest]
airflow_home = ~/airflow/
airflow_dags = ./dags/
airflow_envs =
	D:AIRFLOW__CORE__DEFAULT_TIMEZONE=utc
	AIRFLOW__CORE__PLUGINS_FOLDER=/home/awesomeuser/plugins/
import os
import re
import shutil
from typing import Dict, Tuple
import pytest
from _pytest.config import Config
from _pytest.config.argparsing import Parser
AIRFLOW_ENVS_PATTERN = re.compile(r"^airflow_.*$", re.IGNORECASE)
DEFAULT_AIRFLOW_ENVS: Dict[str, Tuple[str, bool]] = {
"AIRFLOW__CORE__EXECUTOR": ("DebugExecutor", True),
"AIRFLOW__CORE__UNIT_TEST_MODE": ("True", True),
"AIRFLOW__DEBUG__FAIL_FAST": ("True", True),
}
def pytest_addoption(parser: Parser):
"""Add section to configuration files."""
parser.addini(
"airflow_envs",
type="linelist",
help="List of Apache Airflow Environment Variables (name=value)",
default=[]
)
parser.addini(
"airflow_home",
type="string",
help="Path to Apache Airflow Home directory",
default="./.airflow_home"
)
parser.addini(
"airflow_dags",
type="string",
help="Path to Apache Airflow Dags directory",
default="./dags"
)
def _validate_directory(dir_path):
if not os.path.exists(dir_path):
raise FileNotFoundError(dir_path)
elif not os.path.isdir(dir_path):
raise NotADirectoryError(dir_path)
def _make_directory_path(base_path, root_dir, create=False):
base_path = os.path.expanduser(base_path)
base_path = os.path.expandvars(base_path)
if not os.path.isabs(base_path):
base_path = os.path.abspath(os.path.join(root_dir, base_path))
try:
_validate_directory(base_path)
except FileNotFoundError:
if not create:
raise
os.makedirs(base_path)
return base_path
def init_airflow_db(airflow_home: str, unittests_mode: bool = True):
# Avoid import Apache Airflow before setup variables
from airflow.utils.db import upgradedb
if not unittests_mode:
upgradedb()
return
# Remove files related to unittests Database and configuration
unittest_db = os.path.join(airflow_home, "unittests.db")
try:
os.remove(unittest_db)
except FileNotFoundError:
pass
# Check if unittest SQLite database exists and restore it
# Otherwise upgrade (initialize) an make copy if possible
# It will increase startup for next test execution
pre_init_db = os.path.join(airflow_home, "unittests.db.init")
if os.path.exists(pre_init_db):
shutil.copy(pre_init_db, unittest_db)
upgradedb()
else:
upgradedb()
try:
shutil.copy(unittest_db, pre_init_db)
except FileNotFoundError:
pass
return True
@pytest.hookimpl(tryfirst=True)
def pytest_load_initial_conftests(args: list, early_config: Config, parser: Parser):
env_vars = DEFAULT_AIRFLOW_ENVS.copy()
for raw_env in early_config.getini("airflow_envs"):
part = raw_env.partition("=")
key = part[0].strip()
value = part[2].strip()
default = False
if key.startswith("D:"):
key = key[2:]
default = True
if AIRFLOW_ENVS_PATTERN.match(key):
# Upper case for AIRFLOW variables
key = key.upper()
env_vars[key] = (value, default, )
if "AIRFLOW__CORE__DAGS_FOLDER" not in os.environ:
if "AIRFLOW__CORE__DAGS_FOLDER" not in env_vars:
airflow_dags = _make_directory_path(
early_config.getini("airflow_dags"),
early_config.rootpath
)
env_vars["AIRFLOW__CORE__DAGS_FOLDER"] = (airflow_dags, False, )
else:
_validate_directory(env_vars["AIRFLOW__CORE__DAGS_FOLDER"])
if "AIRFLOW_HOME" not in env_vars and "AIRFLOW_HOME" not in os.environ:
airflow_home = _make_directory_path(
early_config.getini("airflow_home"),
early_config.rootpath,
create=True
)
env_vars["AIRFLOW_HOME"] = (airflow_home, True, )
for key, value_tuple in env_vars.items():
value, default = value_tuple
if not default or key not in os.environ:
os.environ[key] = value
unittests_mode = os.environ.get("AIRFLOW__CORE__UNIT_TEST_MODE", False)
if isinstance(unittests_mode, str):
unittests_mode = bool(unittests_mode.capitalize())
init_airflow_db(
os.environ["AIRFLOW_HOME"],
unittests_mode=unittests_mode
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment