Skip to content

Instantly share code, notes, and snippets.

@nelabhotlaR
Last active December 7, 2023 13:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nelabhotlaR/2ee4aa45054acf5ae11351d294753bc4 to your computer and use it in GitHub Desktop.
Save nelabhotlaR/2ee4aa45054acf5ae11351d294753bc4 to your computer and use it in GitHub Desktop.
Triggering Chef InSpec tests with Airflow DAG
"""
Airflow DAG to run chefinspec tests.
"""
import os
import sys
from datetime import datetime, timedelta
from airflow.models import Variable
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow import DAG
sys.path.append(os.path.dirname((os.path.abspath(__file__))))
import config
class ChefInspecDAG:
"""
A class for creating a DAG to run Chef InSpec tests.
dag_id (str): The unique identifier for the DAG.
profile_name (str): The name of the Chef InSpec profile to be executed.
interview_scheduler_instance_id (str): The instance ID of the interview scheduler.
ip_address (str): The IP address of the interview scheduler.
Methods:
create_dag(): Creates and returns the DAG object.
run_chef_inspec_test(**kwargs): Executes the Chef InSpec tests.
"""
def __init__(self, dag_id, profile_name):
self.dag_id = dag_id
self.profile_name = profile_name
os.environ['INTERVIEW_SCHEDULER_INSTANCE_ID'] = config.INTERVIEW_SCHEDULER_INSTANCE_ID
os.environ['INTERVIEW_SCHEDULER_IP'] = config.INTERVIEW_SCHEDULER_IP
self.interview_scheduler_instance_id = os.getenv('INTERVIEW_SCHEDULER_INSTANCE_ID')
self.ip_address = os.getenv('INTERVIEW_SCHEDULER_IP')
def create_dag(self):
"""
Creates the DAG object with specified configurations.
Returns:
DAG: The DAG object.
"""
default_args = {
'owner': 'Chef',
'depends_on_past': False,
'start_date': datetime(2023, 10, 17),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
self.dag_id,
default_args=default_args,
description='A DAG to run Chef InSpec tests',
schedule=None,
)
interview_scheduler_private_key=Variable.get("INTERVIEW_SCHEDULER_PRIVATE_KEY")
private_key_file_path = config.PRIVATE_KEY_FILE_PATH
with open(private_key_file_path, 'w', encoding='UTF-8') as file:
file.write(interview_scheduler_private_key)
BashOperator(
task_id=f'Grant_permission_{self.profile_name}',
bash_command=f'chmod 711 {private_key_file_path}',
dag=dag,
).execute(context={})
PythonOperator(
task_id=f'run_{self.profile_name}_test',
python_callable=self.run_chef_inspec_test,
op_args=[private_key_file_path],
dag=dag,
)
return dag
def run_chef_inspec_test(self, private_key_file_path, **kwargs):
"""
Executes Chef InSpec tests within an Airflow DAG context.
Args:
private_key_file_path (str): Path to the private key file for authentication.
**kwargs: Additional keyword arguments (used by Airflow).
Raises:
Exception: If any error occurs during task execution.
The method performs:
- Creation of a temporary folder for testing purposes.
- Cloning a Git repository into the temporary folder.
- Executing Chef InSpec tests using the provided private key.
- Generates both CLI and HTML reports.
- Cleans up temporary files after completion.
"""
dag = kwargs['dag']
try:
BashOperator(
task_id=f'create_temp_folder_{self.profile_name}',
bash_command=f'mkdir -p {config.TEMPORARY_FOLDER}',
dag=dag,
).execute(context={})
BashOperator(
task_id=f'git_clone_{self.profile_name}',
bash_command=f"if [ -d {config.TEMPORARY_FOLDER} ]; then \
rm -rf {config.TEMPORARY_FOLDER}; \
fi && \
git clone {config.INTERVIEW_GITHUB_REPO} {config.TEMPORARY_FOLDER}",
dag=dag,
).execute(context={})
BashOperator(
task_id=f'inspec_exec_{self.profile_name}',
bash_command=f"inspec exec {config.TEMPORARY_FOLDER}/{self.profile_name} \
-t ssh://ubuntu@{self.ip_address} \
-i {private_key_file_path} \
--no-distinct-exit \
--reporter=cli html:{config.TEST_REPORT} && \
echo 'Inspec execution completed successfully'",
dag=dag,
).execute(context={})
except FileNotFoundError as file_err:
print(f"FileNotFoundError: {file_err}")
except PermissionError as perm_err:
print(f"PermissionError: {perm_err}")
except Exception as err:
print(f"Unexpected Error: {err}")
finally:
BashOperator(
task_id=f'cleanup_{self.profile_name}',
bash_command=f"rm -rf {config.TEMPORARY_FOLDER} && \
rm -rf {private_key_file_path}",
dag=dag,
).execute(context={})
chef_dag = ChefInspecDAG(
dag_id='chefinspec_interview_dag',
profile_name=config.PROFILE,
)
my_dag = chef_dag.create_dag()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment