Skip to content

Instantly share code, notes, and snippets.

@nyurik
Created April 28, 2021 14:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nyurik/b1e8f387f9a88085a7fbfcace9c7c115 to your computer and use it in GitHub Desktop.
Save nyurik/b1e8f387f9a88085a7fbfcace9c7c115 to your computer and use it in GitHub Desktop.
A script to detect when Terraform projects or depended modules are part of a GIT pull request change
#!/usr/bin/env python3
# A script to detect when Terraform projects or depended modules are part of a GIT pull request change
#
# Usage: python3 pr_tf_changes.py <branch> <dir>...
#
# <branch> GIT branch to compare using git diff branch... shell call
# <dir> One or more directories to monitor, including all sub-dirs, relative to repo's root
#
# Set DEBUG env var to see additional debugging information
# If match is found, exitcode is 0, otherwise 1
#
# Requires python-hcl2 lib v2+
# License: MIT
# ASSUMPTION: This script is stored one level below the repo root, e.g. in <reporoot>/scripts/is_tf_in_pr.py
#
import os
import subprocess
import sys
from pathlib import Path
import hcl2
def print_list(title, values):
print(f"{title}\n " + "\n ".join(sorted(str(v) for v in values)))
def scan_sub_dirs(start_dir: Path, result: set):
"""Find all sub-dirs recursively, and add them to the result set"""
if start_dir in result:
return # This prevents infinite loops in case a symlink points to its parent
result.add(start_dir)
for d in start_dir.iterdir():
if d.is_dir() and not d.name.startswith("."):
scan_sub_dirs(d.resolve(), result)
debug = "DEBUG" in os.environ
# ASSUMPTION: This script is stored one level below the repo root, e.g. in <reporoot>/scripts/is_tf_in_pr.py
repo_dir = Path(__file__).parent.parent.resolve()
target_branch = sys.argv[1]
# all arguments after the second are treated as dirs relative to the repo root
tf_new_dirs = set()
for tf_dir_arg in sys.argv[2:]:
tf_dir = (repo_dir / tf_dir_arg).resolve()
if not tf_dir.is_dir():
raise ValueError(
f"All parameters except for the first are expected to be existing directories. Dir '{tf_dir}' does not exist")
scan_sub_dirs(tf_dir, tf_new_dirs)
#
# Find all directories that are part of this project, including module dirs
#
tf_dirs = set()
while len(tf_new_dirs) > 0:
tf_dirs.update(tf_new_dirs)
examine_dirs = tf_new_dirs
tf_new_dirs = set()
for tf_dir in examine_dirs:
for tf_file in tf_dir.glob('*.tf'):
code = hcl2.loads(tf_file.read_text("utf-8"))
try:
modules = code['module']
except KeyError:
if debug:
print(f"No modules in {tf_file}")
continue
for module_obj in modules:
if len(module_obj) != 1:
print(f"WARNING: File {tf_file} has an unexpected module structure")
continue
module, params = module_obj.popitem()
try:
source = params['source']
except KeyError:
print(f"WARNING: Module {module} has no 'source' parameter in {tf_file}")
continue
if len(source) != 1:
print(f"File {tf_file} module {module} has an unexpected number of sources: {source}")
continue
resolved_src = (tf_dir / source[0]).resolve()
if resolved_src not in tf_dirs:
tf_new_dirs.add(resolved_src)
if debug:
print(f"Uses {resolved_src} as {module} in {tf_file}")
if debug:
print_list("Monitored changes:", tf_dirs)
#
# Find all directories modified by the given GIT branch
#
git = subprocess.check_output(
f"git diff --no-renames --name-only origin/{target_branch}...".split(),
cwd=repo_dir).strip().split(b"\n")
changed_dirs = {(repo_dir / v.decode('utf-8')).resolve().parent for v in git}
if debug:
print_list("Detected changes in paths:", changed_dirs)
if not changed_dirs.isdisjoint(tf_dirs):
if debug:
print_list(f"Found matches:", changed_dirs.intersection(tf_dirs))
else:
print("match found")
exit(0)
else:
print("no match found")
exit(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment