Skip to content

Instantly share code, notes, and snippets.

@kmcquade
Created September 20, 2021 18:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kmcquade/0e26d3841d1c063cd7ecc9df0f132128 to your computer and use it in GitHub Desktop.
Save kmcquade/0e26d3841d1c063cd7ecc9df0f132128 to your computer and use it in GitHub Desktop.
Run Terraform get recursively using python-terraform
python_terraform==0.10.1
#! /usr/bin/env python3
import os
import logging
import argparse
import sys
from concurrent.futures import ThreadPoolExecutor, as_completed
from python_terraform import Terraform
import fnmatch
import shutil
logger = logging.getLogger(__name__)
END = "\033[0m"
GREY = "\33[90m"
def terraform_get(tf_directory: str, thread_count: int = 3, clean_existing: bool = False, update_terraform_modules: bool = False):
repo_directory = os.path.abspath(tf_directory)
if not os.path.exists(repo_directory):
raise Exception(f"Please provide a valid directory. Directory provided: {repo_directory}")
if clean_existing:
print("Removing any existing .terraform directories...")
# Remove any existing .terraform directories
purge(directory=repo_directory, pattern="*.terraform")
# Create a list of directory types to skip
unfiltered_subfolders = [x[0] for x in os.walk(repo_directory)]
exclude_directory_patterns = [
".git", ".idea", ".DSStore", ".venv", ".serverless", "node_modules", ".terraform"
]
all_subfolders = []
for sub in unfiltered_subfolders:
for pattern in exclude_directory_patterns:
if pattern not in sub:
all_subfolders.append(sub)
# Find subfolders that have Terraform files in it (not including .terraform folders)
subfolders_with_terraform = set()
for subfolder in all_subfolders:
file_list = get_filenames_in_folder(subfolder)
if does_file_list_contain_terraform_files(file_list):
subfolders_with_terraform.add(subfolder)
subfolders_with_terraform = list(subfolders_with_terraform)
if update_terraform_modules:
threads = []
# First, let's run this in the target directory
# print("Running 'terraform get' on subdirectories...")
print(f"Running 'terraform get' on the root directory: {os.path.relpath(repo_directory)}")
run_tf_get_command(repo_directory, update=True)
# Sometimes there are not Terraform repos inside the main directory, but they are in nested directories.
# Let's run Terraform get for those cases as well.
print(f"Running 'terraform get' on all Terraform modules nested under the folder: {os.path.relpath(repo_directory)}")
with ThreadPoolExecutor(max_workers=thread_count) as executor:
for index, subfolder in enumerate(subfolders_with_terraform):
threads.append(executor.submit(run_tf_get_command, tf_directory=subfolder, update=True))
stuff = as_completed(threads)
for index, task in enumerate(stuff):
response, return_code, tf_dir = task.result()
if return_code > 0:
response_string = f"\texit {return_code}: {response}"
else:
response_string = "\texit 0"
print_grey_replace_output(f"\t[COMPLETED] terraform get ({index + 1}/{len(subfolders_with_terraform)}): {os.path.relpath(tf_dir)}{response_string}")
def run_tf_get_command(tf_directory: str, update: bool = False):
tf = Terraform(working_dir=tf_directory)
os.environ["TF_IN_AUTOMATION"] = "1"
if update:
update_str = "-update"
else:
update_str = None
os.environ["TF_DATA_DIR"] = ".terraform" # Workaround to https://github.com/bridgecrewio/checkov/issues/1312
return_code, stdout, stderr = tf.cmd("get", "-no-color", update_str)
return stdout, return_code, tf_directory
def purge(directory: str, pattern: str, exclude: list = None) -> None:
"""
Usage:
purge(".", "*.tfstate")
purge(".", "*.tfstate.backup")
purge(".", "*.terraform")
purge(".", FILE_PATH_WITH_SPACE_AND_SPACIAL_CHARS)
:return:
"""
if not exclude:
exclude = []
for root, dirnames, filenames in os.walk(directory):
dirnames[:] = [d for d in dirnames if d not in exclude]
for filename in fnmatch.filter(filenames, pattern):
f = os.path.join(root, filename)
os.remove(f)
for dirname in fnmatch.filter(dirnames, pattern):
d = os.path.join(root, dirname)
shutil.rmtree(d)
def get_filenames_in_folder(path: str) -> list:
"""Given a folder path, get a list of filenames in that folder."""
file_list = []
try:
for filename in os.listdir(path):
if os.path.isfile(os.path.join(path, filename)):
if filename not in file_list:
file_list.append(filename)
file_list.sort()
return file_list
except FileNotFoundError:
return []
def does_file_list_contain_terraform_files(file_list: list) -> bool:
result = False
for file in file_list:
if file.endswith(".tf"):
result = True
break
return result
def print_grey_replace_output(string):
sys.stdout.write(f"\r{GREY}{string}{END}")
sys.stdout.flush()
def print_grey(string):
print(f"{GREY}{string}{END}")
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("-d", "--directory", help="Base directory containing the Terraform files")
parser.add_argument("-t", "--threads", type=int, default=3, help="Thread count")
args = parser.parse_args()
this_tf_dir = os.path.abspath(args.directory)
terraform_get(tf_directory=this_tf_dir, thread_count=args.threads, update_terraform_modules=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment