Created
April 28, 2021 14:43
-
-
Save nyurik/b1e8f387f9a88085a7fbfcace9c7c115 to your computer and use it in GitHub Desktop.
A script to detect when Terraform projects or depended modules are part of a GIT pull request change
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# A script to detect when Terraform projects or depended modules are part of a GIT pull request change | |
# | |
# Usage: python3 pr_tf_changes.py <branch> <dir>... | |
# | |
# <branch> GIT branch to compare using git diff branch... shell call | |
# <dir> One or more directories to monitor, including all sub-dirs, relative to repo's root | |
# | |
# Set DEBUG env var to see additional debugging information | |
# If match is found, exitcode is 0, otherwise 1 | |
# | |
# Requires python-hcl2 lib v2+ | |
# License: MIT | |
# ASSUMPTION: This script is stored one level below the repo root, e.g. in <reporoot>/scripts/is_tf_in_pr.py | |
# | |
import os | |
import subprocess | |
import sys | |
from pathlib import Path | |
import hcl2 | |
def print_list(title, values): | |
print(f"{title}\n " + "\n ".join(sorted(str(v) for v in values))) | |
def scan_sub_dirs(start_dir: Path, result: set): | |
"""Find all sub-dirs recursively, and add them to the result set""" | |
if start_dir in result: | |
return # This prevents infinite loops in case a symlink points to its parent | |
result.add(start_dir) | |
for d in start_dir.iterdir(): | |
if d.is_dir() and not d.name.startswith("."): | |
scan_sub_dirs(d.resolve(), result) | |
debug = "DEBUG" in os.environ | |
# ASSUMPTION: This script is stored one level below the repo root, e.g. in <reporoot>/scripts/is_tf_in_pr.py | |
repo_dir = Path(__file__).parent.parent.resolve() | |
target_branch = sys.argv[1] | |
# all arguments after the second are treated as dirs relative to the repo root | |
tf_new_dirs = set() | |
for tf_dir_arg in sys.argv[2:]: | |
tf_dir = (repo_dir / tf_dir_arg).resolve() | |
if not tf_dir.is_dir(): | |
raise ValueError( | |
f"All parameters except for the first are expected to be existing directories. Dir '{tf_dir}' does not exist") | |
scan_sub_dirs(tf_dir, tf_new_dirs) | |
# | |
# Find all directories that are part of this project, including module dirs | |
# | |
tf_dirs = set() | |
while len(tf_new_dirs) > 0: | |
tf_dirs.update(tf_new_dirs) | |
examine_dirs = tf_new_dirs | |
tf_new_dirs = set() | |
for tf_dir in examine_dirs: | |
for tf_file in tf_dir.glob('*.tf'): | |
code = hcl2.loads(tf_file.read_text("utf-8")) | |
try: | |
modules = code['module'] | |
except KeyError: | |
if debug: | |
print(f"No modules in {tf_file}") | |
continue | |
for module_obj in modules: | |
if len(module_obj) != 1: | |
print(f"WARNING: File {tf_file} has an unexpected module structure") | |
continue | |
module, params = module_obj.popitem() | |
try: | |
source = params['source'] | |
except KeyError: | |
print(f"WARNING: Module {module} has no 'source' parameter in {tf_file}") | |
continue | |
if len(source) != 1: | |
print(f"File {tf_file} module {module} has an unexpected number of sources: {source}") | |
continue | |
resolved_src = (tf_dir / source[0]).resolve() | |
if resolved_src not in tf_dirs: | |
tf_new_dirs.add(resolved_src) | |
if debug: | |
print(f"Uses {resolved_src} as {module} in {tf_file}") | |
if debug: | |
print_list("Monitored changes:", tf_dirs) | |
# | |
# Find all directories modified by the given GIT branch | |
# | |
git = subprocess.check_output( | |
f"git diff --no-renames --name-only origin/{target_branch}...".split(), | |
cwd=repo_dir).strip().split(b"\n") | |
changed_dirs = {(repo_dir / v.decode('utf-8')).resolve().parent for v in git} | |
if debug: | |
print_list("Detected changes in paths:", changed_dirs) | |
if not changed_dirs.isdisjoint(tf_dirs): | |
if debug: | |
print_list(f"Found matches:", changed_dirs.intersection(tf_dirs)) | |
else: | |
print("match found") | |
exit(0) | |
else: | |
print("no match found") | |
exit(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment